https://issues.apache.org/jira/browse/AMQ-6083 - ignoreMissingJournalfiles false - check for missing acks and corruption anywhere and error out so that corruption does not go unnoticed - fix and test

(cherry picked from commit 5db5f3e39a)
This commit is contained in:
gtully 2015-12-09 12:30:03 +00:00 committed by Timothy Bish
parent 0952866273
commit 99fce5bae9
5 changed files with 125 additions and 33 deletions

View File

@ -825,14 +825,21 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
while (!ss.isEmpty()) {
missingJournalFiles.add((int) ss.removeFirst());
}
missingJournalFiles.removeAll(journal.getFileMap().keySet());
if (!missingJournalFiles.isEmpty()) {
if (LOG.isInfoEnabled()) {
LOG.info("Some journal files are missing: " + missingJournalFiles);
for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) {
missingJournalFiles.add(entry.getKey());
for (Integer i : entry.getValue()) {
missingJournalFiles.add(i);
}
}
missingJournalFiles.removeAll(journal.getFileMap().keySet());
if (!missingJournalFiles.isEmpty()) {
LOG.warn("Some journal files are missing: " + missingJournalFiles);
}
ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>();
ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
for (Integer missing : missingJournalFiles) {
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
@ -842,10 +849,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
Collection<DataFile> dataFiles = journal.getFileMap().values();
for (DataFile dataFile : dataFiles) {
int id = dataFile.getDataFileId();
// eof to next file id
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
Sequence seq = dataFile.getCorruptedBlocks().getHead();
while (seq != null) {
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)));
BTreeVisitor.BetweenVisitor visitor = new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
missingPredicates.add(visitor);
knownCorruption.add(visitor);
seq = seq.getNext();
}
}
@ -862,7 +872,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
});
// If somes message references are affected by the missing data files...
// If some message references are affected by the missing data files...
if (!matches.isEmpty()) {
// We either 'gracefully' recover dropping the missing messages or
@ -879,12 +889,25 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// TODO: do we need to modify the ack positions for the pub sub case?
}
} else {
throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
LOG.error("[" + sdEntry.getKey() + "] references corrupt locations. " + matches.size() + " messages affected.");
throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
}
}
}
}
if (!ignoreMissingJournalfiles) {
if (!knownCorruption.isEmpty()) {
LOG.error("Detected corrupt journal files. " + knownCorruption);
throw new IOException("Detected corrupt journal files. " + knownCorruption);
}
if (!missingJournalFiles.isEmpty()) {
LOG.error("Detected missing journal files. " + missingJournalFiles);
throw new IOException("Detected missing journal files. " + missingJournalFiles);
}
}
if( undoCounter > 0 ) {
// The rolledback operations are basically in flight journal writes. To avoid getting these the end user
// should do sync writes to the journal.
@ -1714,6 +1737,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// check we are not deleting file with ack for in-use journal files
if (LOG.isTraceEnabled()) {
LOG.trace("gc candidates: " + gcCandidateSet);
LOG.trace("ackMessageFileMap: " + metadata.ackMessageFileMap);
}
Iterator<Integer> candidates = gcCandidateSet.iterator();
while (candidates.hasNext()) {
@ -1743,6 +1767,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
}
journal.removeDataFiles(gcCandidateSet);
boolean ackMessageFileMapMod = false;
for (Integer candidate : gcCandidateSet) {
for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
ackMessageFileMapMod |= ackFiles.remove(candidate);
}
}
if (ackMessageFileMapMod) {
checkpointUpdate(tx, false);
}
}
}

View File

@ -166,7 +166,7 @@ public interface BTreeVisitor<Key,Value> {
@Override
public String toString() {
return first+" <= key < "+last;
return first+" >= key < "+last;
}
}

View File

@ -18,6 +18,8 @@ package org.apache.activemq.store.kahadb;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
@ -40,6 +42,7 @@ import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -53,6 +56,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
private BrokerService broker = null;
private String connectionUri;
private KahaDBPersistenceAdapter adapter;
private boolean ignoreMissingJournalFiles = false;
private final Destination destination = new ActiveMQQueue("Test");
private final String KAHADB_DIRECTORY = "target/activemq-data/";
@ -118,7 +122,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
adapter.setCleanupInterval(5000);
adapter.setCheckForCorruptJournalFiles(true);
adapter.setIgnoreMissingJournalfiles(true);
adapter.setIgnoreMissingJournalfiles(ignoreMissingJournalFiles);
adapter.setPreallocationStrategy("zeros");
adapter.setPreallocationScope("entire_journal");
@ -132,6 +136,32 @@ public class JournalCorruptionEofIndexRecoveryTest {
}
}
@Before
public void reset() throws Exception {
ignoreMissingJournalFiles = true;
}
@Test
public void testNoRestartOnCorruptJournal() throws Exception {
ignoreMissingJournalFiles = false;
startBroker();
produceMessagesToConsumeMultipleDataFiles(50);
int numFiles = getNumberOfJournalFiles();
assertTrue("more than x files: " + numFiles, numFiles > 2);
corruptBatchEndEof(3);
try {
restartBroker(true);
fail("Expect failure to start with corrupt journal");
} catch (Exception expected) {
}
}
@Test
public void testRecoveryAfterCorruptionEof() throws Exception {
startBroker();

View File

@ -20,7 +20,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
@ -131,35 +133,62 @@ public class AMQ2832Test {
*/
@Test
public void testAckChain() throws Exception {
startBroker();
startBroker();
StagedConsumer consumer = new StagedConsumer();
// file #1
produceMessagesToConsumeMultipleDataFiles(5);
// acknowledge first 2 messages and leave the 3rd one unacknowledged blocking db-1.log
consumer.receive(3);
makeAckChain();
// send messages by consuming and acknowledging every message right after sent in order to get KahadbAdd and Remove command to be saved together
// this is necessary in order to get KahaAddMessageCommand to be saved in one db file and the corresponding KahaRemoveMessageCommand in the next one
produceAndConsumeImmediately(20, consumer);
consumer.receive(2).acknowledge(); // consume and ack the last 2 unconsumed
broker.stop();
broker.waitUntilStopped();
// now we have 3 files written and started with #4
consumer.close();
recoverBroker();
broker.stop();
broker.waitUntilStopped();
StagedConsumer consumer = new StagedConsumer();
Message message = consumer.receive(1);
assertNotNull("One message stays unacked from db-1.log", message);
message.acknowledge();
message = consumer.receive(1);
assertNull("There should not be any unconsumed messages any more", message);
consumer.close();
}
recoverBroker();
private void makeAckChain() throws Exception {
StagedConsumer consumer = new StagedConsumer();
// file #1
produceMessagesToConsumeMultipleDataFiles(5);
// acknowledge first 2 messages and leave the 3rd one unacknowledged blocking db-1.log
consumer.receive(3);
// send messages by consuming and acknowledging every message right after sent in order to get KahadbAdd and Remove command to be saved together
// this is necessary in order to get KahaAddMessageCommand to be saved in one db file and the corresponding KahaRemoveMessageCommand in the next one
produceAndConsumeImmediately(20, consumer);
consumer.receive(2).acknowledge(); // consume and ack the last 2 unconsumed
// now we have 3 files written and started with #4
consumer.close();
}
@Test
public void testNoRestartOnMissingAckDataFile() throws Exception {
startBroker();
// reuse scenario from previous test
makeAckChain();
File dataDir = broker.getPersistenceAdapter().getDirectory();
broker.stop();
broker.waitUntilStopped();
File secondLastDataFile = new File(dataDir, "db-3.log");
LOG.info("Whacking data file with acks: " + secondLastDataFile);
secondLastDataFile.delete();
try {
doStartBroker(false, false);
fail("Expect failure to start with corrupt journal");
} catch (IOException expected) {
}
}
consumer = new StagedConsumer();
Message message = consumer.receive(1);
assertNotNull("One message stays unacked from db-1.log", message);
message.acknowledge();
message = consumer.receive(1);
assertNull("There should not be any unconsumed messages any more", message);
consumer.close();
}
private void produceAndConsumeImmediately(int numOfMsgs, StagedConsumer consumer) throws Exception {
for (int i = 0; i < numOfMsgs; i++) {

View File

@ -108,7 +108,7 @@ public class AMQ4212Test {
}
@Test
public void testDirableSubPrefetchRecovered() throws Exception {
public void testDurableSubPrefetchRecovered() throws Exception {
ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");