mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-6083 - ignoreMissingJournalfiles false - check for missing acks and corruption anywhere and error out so that corruption does not go unnoticed - fix and test
This commit is contained in:
parent
9c4ef26d5c
commit
5db5f3e39a
|
@ -825,14 +825,21 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
while (!ss.isEmpty()) {
|
while (!ss.isEmpty()) {
|
||||||
missingJournalFiles.add((int) ss.removeFirst());
|
missingJournalFiles.add((int) ss.removeFirst());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) {
|
||||||
|
missingJournalFiles.add(entry.getKey());
|
||||||
|
for (Integer i : entry.getValue()) {
|
||||||
|
missingJournalFiles.add(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
missingJournalFiles.removeAll(journal.getFileMap().keySet());
|
missingJournalFiles.removeAll(journal.getFileMap().keySet());
|
||||||
|
|
||||||
if (!missingJournalFiles.isEmpty()) {
|
if (!missingJournalFiles.isEmpty()) {
|
||||||
if (LOG.isInfoEnabled()) {
|
LOG.warn("Some journal files are missing: " + missingJournalFiles);
|
||||||
LOG.info("Some journal files are missing: " + missingJournalFiles);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>();
|
||||||
ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
|
ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
|
||||||
for (Integer missing : missingJournalFiles) {
|
for (Integer missing : missingJournalFiles) {
|
||||||
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
|
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
|
||||||
|
@ -842,10 +849,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
Collection<DataFile> dataFiles = journal.getFileMap().values();
|
Collection<DataFile> dataFiles = journal.getFileMap().values();
|
||||||
for (DataFile dataFile : dataFiles) {
|
for (DataFile dataFile : dataFiles) {
|
||||||
int id = dataFile.getDataFileId();
|
int id = dataFile.getDataFileId();
|
||||||
|
// eof to next file id
|
||||||
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
|
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
|
||||||
Sequence seq = dataFile.getCorruptedBlocks().getHead();
|
Sequence seq = dataFile.getCorruptedBlocks().getHead();
|
||||||
while (seq != null) {
|
while (seq != null) {
|
||||||
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)));
|
BTreeVisitor.BetweenVisitor visitor = new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
|
||||||
|
missingPredicates.add(visitor);
|
||||||
|
knownCorruption.add(visitor);
|
||||||
seq = seq.getNext();
|
seq = seq.getNext();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -862,7 +872,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// If somes message references are affected by the missing data files...
|
// If some message references are affected by the missing data files...
|
||||||
if (!matches.isEmpty()) {
|
if (!matches.isEmpty()) {
|
||||||
|
|
||||||
// We either 'gracefully' recover dropping the missing messages or
|
// We either 'gracefully' recover dropping the missing messages or
|
||||||
|
@ -879,12 +889,25 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
// TODO: do we need to modify the ack positions for the pub sub case?
|
// TODO: do we need to modify the ack positions for the pub sub case?
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
|
LOG.error("[" + sdEntry.getKey() + "] references corrupt locations. " + matches.size() + " messages affected.");
|
||||||
|
throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!ignoreMissingJournalfiles) {
|
||||||
|
if (!knownCorruption.isEmpty()) {
|
||||||
|
LOG.error("Detected corrupt journal files. " + knownCorruption);
|
||||||
|
throw new IOException("Detected corrupt journal files. " + knownCorruption);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!missingJournalFiles.isEmpty()) {
|
||||||
|
LOG.error("Detected missing journal files. " + missingJournalFiles);
|
||||||
|
throw new IOException("Detected missing journal files. " + missingJournalFiles);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if( undoCounter > 0 ) {
|
if( undoCounter > 0 ) {
|
||||||
// The rolledback operations are basically in flight journal writes. To avoid getting these the end user
|
// The rolledback operations are basically in flight journal writes. To avoid getting these the end user
|
||||||
// should do sync writes to the journal.
|
// should do sync writes to the journal.
|
||||||
|
@ -1714,6 +1737,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
// check we are not deleting file with ack for in-use journal files
|
// check we are not deleting file with ack for in-use journal files
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("gc candidates: " + gcCandidateSet);
|
LOG.trace("gc candidates: " + gcCandidateSet);
|
||||||
|
LOG.trace("ackMessageFileMap: " + metadata.ackMessageFileMap);
|
||||||
}
|
}
|
||||||
Iterator<Integer> candidates = gcCandidateSet.iterator();
|
Iterator<Integer> candidates = gcCandidateSet.iterator();
|
||||||
while (candidates.hasNext()) {
|
while (candidates.hasNext()) {
|
||||||
|
@ -1743,6 +1767,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
|
LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
|
||||||
}
|
}
|
||||||
journal.removeDataFiles(gcCandidateSet);
|
journal.removeDataFiles(gcCandidateSet);
|
||||||
|
boolean ackMessageFileMapMod = false;
|
||||||
|
for (Integer candidate : gcCandidateSet) {
|
||||||
|
for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
|
||||||
|
ackMessageFileMapMod |= ackFiles.remove(candidate);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (ackMessageFileMapMod) {
|
||||||
|
checkpointUpdate(tx, false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -166,7 +166,7 @@ public interface BTreeVisitor<Key,Value> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return first+" <= key < "+last;
|
return first+" >= key < "+last;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,8 @@ package org.apache.activemq.store.kahadb;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -40,6 +42,7 @@ import org.apache.activemq.util.ByteSequence;
|
||||||
import org.apache.activemq.util.IOHelper;
|
import org.apache.activemq.util.IOHelper;
|
||||||
import org.apache.activemq.util.RecoverableRandomAccessFile;
|
import org.apache.activemq.util.RecoverableRandomAccessFile;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -53,6 +56,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
|
||||||
private BrokerService broker = null;
|
private BrokerService broker = null;
|
||||||
private String connectionUri;
|
private String connectionUri;
|
||||||
private KahaDBPersistenceAdapter adapter;
|
private KahaDBPersistenceAdapter adapter;
|
||||||
|
private boolean ignoreMissingJournalFiles = false;
|
||||||
|
|
||||||
private final Destination destination = new ActiveMQQueue("Test");
|
private final Destination destination = new ActiveMQQueue("Test");
|
||||||
private final String KAHADB_DIRECTORY = "target/activemq-data/";
|
private final String KAHADB_DIRECTORY = "target/activemq-data/";
|
||||||
|
@ -118,7 +122,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
|
||||||
adapter.setCleanupInterval(5000);
|
adapter.setCleanupInterval(5000);
|
||||||
|
|
||||||
adapter.setCheckForCorruptJournalFiles(true);
|
adapter.setCheckForCorruptJournalFiles(true);
|
||||||
adapter.setIgnoreMissingJournalfiles(true);
|
adapter.setIgnoreMissingJournalfiles(ignoreMissingJournalFiles);
|
||||||
|
|
||||||
adapter.setPreallocationStrategy("zeros");
|
adapter.setPreallocationStrategy("zeros");
|
||||||
adapter.setPreallocationScope("entire_journal");
|
adapter.setPreallocationScope("entire_journal");
|
||||||
|
@ -132,6 +136,32 @@ public class JournalCorruptionEofIndexRecoveryTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void reset() throws Exception {
|
||||||
|
ignoreMissingJournalFiles = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoRestartOnCorruptJournal() throws Exception {
|
||||||
|
ignoreMissingJournalFiles = false;
|
||||||
|
|
||||||
|
startBroker();
|
||||||
|
|
||||||
|
produceMessagesToConsumeMultipleDataFiles(50);
|
||||||
|
|
||||||
|
int numFiles = getNumberOfJournalFiles();
|
||||||
|
|
||||||
|
assertTrue("more than x files: " + numFiles, numFiles > 2);
|
||||||
|
|
||||||
|
corruptBatchEndEof(3);
|
||||||
|
|
||||||
|
try {
|
||||||
|
restartBroker(true);
|
||||||
|
fail("Expect failure to start with corrupt journal");
|
||||||
|
} catch (Exception expected) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRecoveryAfterCorruptionEof() throws Exception {
|
public void testRecoveryAfterCorruptionEof() throws Exception {
|
||||||
startBroker();
|
startBroker();
|
||||||
|
|
|
@ -20,7 +20,9 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -133,6 +135,23 @@ public class AMQ2832Test {
|
||||||
public void testAckChain() throws Exception {
|
public void testAckChain() throws Exception {
|
||||||
startBroker();
|
startBroker();
|
||||||
|
|
||||||
|
makeAckChain();
|
||||||
|
|
||||||
|
broker.stop();
|
||||||
|
broker.waitUntilStopped();
|
||||||
|
|
||||||
|
recoverBroker();
|
||||||
|
|
||||||
|
StagedConsumer consumer = new StagedConsumer();
|
||||||
|
Message message = consumer.receive(1);
|
||||||
|
assertNotNull("One message stays unacked from db-1.log", message);
|
||||||
|
message.acknowledge();
|
||||||
|
message = consumer.receive(1);
|
||||||
|
assertNull("There should not be any unconsumed messages any more", message);
|
||||||
|
consumer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void makeAckChain() throws Exception {
|
||||||
StagedConsumer consumer = new StagedConsumer();
|
StagedConsumer consumer = new StagedConsumer();
|
||||||
// file #1
|
// file #1
|
||||||
produceMessagesToConsumeMultipleDataFiles(5);
|
produceMessagesToConsumeMultipleDataFiles(5);
|
||||||
|
@ -146,20 +165,30 @@ public class AMQ2832Test {
|
||||||
|
|
||||||
// now we have 3 files written and started with #4
|
// now we have 3 files written and started with #4
|
||||||
consumer.close();
|
consumer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoRestartOnMissingAckDataFile() throws Exception {
|
||||||
|
startBroker();
|
||||||
|
|
||||||
|
// reuse scenario from previous test
|
||||||
|
makeAckChain();
|
||||||
|
|
||||||
|
File dataDir = broker.getPersistenceAdapter().getDirectory();
|
||||||
broker.stop();
|
broker.stop();
|
||||||
broker.waitUntilStopped();
|
broker.waitUntilStopped();
|
||||||
|
|
||||||
recoverBroker();
|
File secondLastDataFile = new File(dataDir, "db-3.log");
|
||||||
|
LOG.info("Whacking data file with acks: " + secondLastDataFile);
|
||||||
|
secondLastDataFile.delete();
|
||||||
|
|
||||||
consumer = new StagedConsumer();
|
try {
|
||||||
Message message = consumer.receive(1);
|
doStartBroker(false, false);
|
||||||
assertNotNull("One message stays unacked from db-1.log", message);
|
fail("Expect failure to start with corrupt journal");
|
||||||
message.acknowledge();
|
} catch (IOException expected) {
|
||||||
message = consumer.receive(1);
|
|
||||||
assertNull("There should not be any unconsumed messages any more", message);
|
|
||||||
consumer.close();
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private void produceAndConsumeImmediately(int numOfMsgs, StagedConsumer consumer) throws Exception {
|
private void produceAndConsumeImmediately(int numOfMsgs, StagedConsumer consumer) throws Exception {
|
||||||
for (int i = 0; i < numOfMsgs; i++) {
|
for (int i = 0; i < numOfMsgs; i++) {
|
||||||
|
|
|
@ -108,7 +108,7 @@ public class AMQ4212Test {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDirableSubPrefetchRecovered() throws Exception {
|
public void testDurableSubPrefetchRecovered() throws Exception {
|
||||||
|
|
||||||
ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
|
ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
|
||||||
ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
|
ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
|
||||||
|
|
Loading…
Reference in New Issue