AMQ-6378 force recovery on corrupt metadata location info, relates to AMQ-6376

This commit is contained in:
gtully 2016-07-27 15:56:28 +01:00
parent 51b413309c
commit 822e2be90e
5 changed files with 73 additions and 38 deletions

View File

@ -751,8 +751,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private Location recoverProducerAudit() throws IOException {
if (metadata.producerSequenceIdTrackerLocation != null) {
KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
try {
KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
int maxNumProducers = getMaxFailoverProducersToTrack();
int maxAuditDepth = getFailoverProducersAuditDepth();
@ -773,8 +773,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
@SuppressWarnings("unchecked")
private Location recoverAckMessageFileMap() throws IOException {
if (metadata.ackMessageFileMapLocation != null) {
KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
try {
KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
return getNextInitializedLocation(metadata.ackMessageFileMapLocation);
@ -3228,6 +3228,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return journal;
}
protected Metadata getMetadata() {
return metadata;
}
public boolean isFailIfDatabaseIsLocked() {
return failIfDatabaseIsLocked;
}

View File

@ -89,7 +89,7 @@ final class DataFileAccessor {
return new ByteSequence(data, 0, data.length);
} catch (RuntimeException e) {
throw new IOException("Invalid location: " + location + ", : " + e, e);
throw new IOException("Invalid location: " + location + " : " + e, e);
}
}

View File

@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.Map.Entry;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@ -42,6 +43,7 @@ import org.apache.activemq.store.kahadb.MessageDatabase.MessageKeys;
import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOHelper;
@ -232,6 +234,48 @@ public class JournalCorruptionEofIndexRecoveryTest {
assertEquals("Drain", numToSend, drainQueue(numToSend));
}
@Test
public void testRecoveryAfterProducerAuditLocationCorrupt() throws Exception {
doTestRecoveryAfterLocationCorrupt(false);
}
@Test
public void testRecoveryAfterAckMapLocationCorrupt() throws Exception {
doTestRecoveryAfterLocationCorrupt(true);
}
private void doTestRecoveryAfterLocationCorrupt(boolean aOrB) throws Exception {
startBroker();
produceMessagesToConsumeMultipleDataFiles(50);
int numFiles = getNumberOfJournalFiles();
assertTrue("more than x files: " + numFiles, numFiles > 4);
KahaDBStore store = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore();
store.checkpointCleanup(true);
Location toCorrupt = aOrB ? store.getMetadata().ackMessageFileMapLocation : store.getMetadata().producerSequenceIdTrackerLocation;
corruptLocation(toCorrupt);
restartBroker(false, false);
assertEquals("missing no message", 50, broker.getAdminView().getTotalMessageCount());
assertEquals("Drain", 50, drainQueue(50));
}
private void corruptLocation(Location toCorrupt) throws IOException {
DataFile dataFile = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().get(new Integer(toCorrupt.getDataFileId()));
RecoverableRandomAccessFile randomAccessFile = dataFile.openRandomAccessFile();
randomAccessFile.seek(toCorrupt.getOffset());
randomAccessFile.writeInt(3);
dataFile.closeRandomAccessFile(randomAccessFile);
}
private void corruptBatchCheckSumSplash(int id) throws Exception{
Collection<DataFile> files =
((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
@ -367,19 +411,30 @@ public class JournalCorruptionEofIndexRecoveryTest {
}
private int drainQueue(int max) throws Exception {
return drain(cf, destination, max);
}
public static int drain(ConnectionFactory cf, Destination destination, int max) throws Exception {
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
MessageConsumer consumer = null;
int count = 0;
try {
while (count < max && consumer.receive(5000) != null) {
consumer = session.createConsumer(destination);
while (count < max && consumer.receive(4000) != null) {
count++;
}
} catch (JMSException ok) {
} finally {
consumer.close();
connection.close();
if (consumer != null) {
try {
consumer.close();
} catch (JMSException ok) {}
}
try {
connection.close();
} catch (JMSException ok) {}
}
return count;
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.store.kahadb;
import static org.apache.activemq.store.kahadb.JournalCorruptionEofIndexRecoveryTest.drain;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -36,6 +37,7 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.junit.After;
@ -265,16 +267,7 @@ public class JournalCorruptionIndexRecoveryTest {
}
private int drainQueue(int max) throws Exception {
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
int count = 0;
while (count < max && consumer.receive(5000) != null) {
count++;
}
consumer.close();
connection.close();
return count;
return drain(cf, destination, max);
}
}

View File

@ -43,6 +43,7 @@ import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.activemq.store.kahadb.JournalCorruptionEofIndexRecoveryTest.drain;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
@ -247,26 +248,8 @@ public class JournalFdRecoveryTest {
}
private int tryConsume(Destination destination, int numToGet) throws Exception {
int got = 0;
Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
connection.start();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
for (int i = 0; i < numToGet; i++) {
if (consumer.receive(4000) == null) {
// give up on timeout or error
break;
}
got++;
}
} catch (JMSException ok) {
} finally {
connection.close();
}
return got;
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
return drain(cf, destination, numToGet);
}
private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception {