[AMQ-6815] have checkpoint validate status of async writes to avoid stale metadata and validate location size on read to avoid potential oom on restart

This commit is contained in:
gtully 2017-09-19 16:51:00 +01:00
parent f82eccd2f5
commit 8c3ef6cadb
7 changed files with 159 additions and 10 deletions

View File

@ -2132,6 +2132,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback); Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
try { try {
location.getLatch().await(); location.getLatch().await();
if (location.getBatch().exception.get() != null) {
throw location.getBatch().exception.get();
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new InterruptedIOException(e.toString()); throw new InterruptedIOException(e.toString());
} }
@ -3135,7 +3138,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return index; return index;
} }
private Journal createJournal() throws IOException { protected Journal createJournal() throws IOException {
Journal manager = new Journal(); Journal manager = new Journal();
manager.setDirectory(directory); manager.setDirectory(directory);
manager.setMaxFileLength(getJournalMaxFileLength()); manager.setMaxFileLength(getJournalMaxFileLength());

View File

@ -84,7 +84,9 @@ final class DataFileAccessor {
} else { } else {
file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE); file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE);
} }
if ((long)location.getOffset() + location.getSize() > dataFile.length) {
throw new IOException("Invalid location size: " + location + ", size: " + location.getSize());
}
byte[] data = new byte[location.getSize() - Journal.RECORD_HEAD_SPACE]; byte[] data = new byte[location.getSize() - Journal.RECORD_HEAD_SPACE];
file.readFully(data); file.readFully(data);
return new ByteSequence(data, 0, data.length); return new ByteSequence(data, 0, data.length);

View File

@ -127,7 +127,7 @@ class DataFileAppender implements FileAppender {
Journal.WriteCommand write = new Journal.WriteCommand(location, data, sync); Journal.WriteCommand write = new Journal.WriteCommand(location, data, sync);
WriteBatch batch = enqueue(write); WriteBatch batch = enqueue(write);
location.setLatch(batch.latch); location.setBatch(batch);
if (sync) { if (sync) {
try { try {
batch.latch.await(); batch.latch.await();
@ -153,10 +153,8 @@ class DataFileAppender implements FileAppender {
location.setType(type); location.setType(type);
Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete); Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete);
location.setBatch(enqueue(write));
WriteBatch batch = enqueue(write);
location.setLatch(batch.latch);
return location; return location;
} }

View File

@ -36,7 +36,7 @@ public final class Location implements Comparable<Location> {
private int offset = NOT_SET; private int offset = NOT_SET;
private int size = NOT_SET; private int size = NOT_SET;
private byte type = NOT_SET_TYPE; private byte type = NOT_SET_TYPE;
private CountDownLatch latch; private DataFileAppender.WriteBatch batch;
public Location() { public Location() {
} }
@ -114,11 +114,11 @@ public final class Location implements Comparable<Location> {
} }
public CountDownLatch getLatch() { public CountDownLatch getLatch() {
return latch; return batch.latch;
} }
public void setLatch(CountDownLatch latch) { public void setBatch(DataFileAppender.WriteBatch batch) {
this.latch = latch; this.batch = batch;
} }
public int compareTo(Location o) { public int compareTo(Location o) {
@ -142,4 +142,7 @@ public final class Location implements Comparable<Location> {
return dataFileId ^ offset; return dataFileId ^ offset;
} }
public DataFileAppender.WriteBatch getBatch() {
return batch;
}
} }

View File

@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
@ -46,8 +47,11 @@ import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Transaction; import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.RecoverableRandomAccessFile; import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -192,6 +196,58 @@ public class JournalCorruptionEofIndexRecoveryTest {
assertEquals("Drain", 49, drainQueue(49)); assertEquals("Drain", 49, drainQueue(49));
} }
@Test
public void testRecoveryAfterCorruptionMetadataLocation() throws Exception {
startBroker();
produceMessagesToConsumeMultipleDataFiles(50);
int numFiles = getNumberOfJournalFiles();
assertTrue("more than x files: " + numFiles, numFiles > 2);
broker.getPersistenceAdapter().checkpoint(true);
Location location = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getMetadata().producerSequenceIdTrackerLocation;
DataFile dataFile = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().get(Integer.valueOf(location.getDataFileId()));
RecoverableRandomAccessFile randomAccessFile = dataFile.openRandomAccessFile();
randomAccessFile.seek(location.getOffset());
randomAccessFile.writeInt(Integer.MAX_VALUE);
randomAccessFile.getChannel().force(true);
((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().close();
try {
broker.stop();
broker.waitUntilStopped();
} catch (Exception expected) {
} finally {
broker = null;
}
AtomicBoolean trappedExpectedLogMessage = new AtomicBoolean(false);
DefaultTestAppender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
if (event.getLevel() == Level.WARN
&& event.getRenderedMessage().contains("Cannot recover message audit")
&& event.getThrowableInformation().getThrowable().getLocalizedMessage().contains("Invalid location size")) {
trappedExpectedLogMessage.set(true);
}
}
};
org.apache.log4j.Logger.getRootLogger().addAppender(appender);
try {
restartBroker(false);
} finally {
org.apache.log4j.Logger.getRootLogger().removeAppender(appender);
}
assertEquals("no missing message", 50, broker.getAdminView().getTotalMessageCount());
assertTrue("Did replay records on invalid location size", trappedExpectedLogMessage.get());
}
@Test @Test
public void testRecoveryAfterCorruptionCheckSum() throws Exception { public void testRecoveryAfterCorruptionCheckSum() throws Exception {
startBroker(); startBroker();

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
public class MessageDatabaseTest {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Test
public void testCheckPointCleanupErrorBubblesUp() throws Exception {
CountDownLatch traceCommandComplete = new CountDownLatch(1);
KahaDBStore kaha = new KahaDBStore() {
public Journal createJournal() {
Journal journal = new Journal() {
public boolean isChecksum() {
// allow trace command on start
if (traceCommandComplete.getCount() > 0) {
traceCommandComplete.countDown();
return false;
}
// called from processQ, we can throw here to error out the async write
throw new RuntimeException("Fail with error on processQ");
}
};
journal.setDirectory(directory);
return journal;
}
};
kaha.setDirectory(new File(temporaryFolder.getRoot(), "kaha"));
kaha.setCheckpointInterval(0l); // disable periodic checkpoint
kaha.setBrokerService(new BrokerService() {
public void handleIOException(IOException exception) {
exception.printStackTrace();
}
});
kaha.start();
assertTrue(traceCommandComplete.await(5, TimeUnit.SECONDS));
try {
kaha.checkpoint(false);
fail("expect error on first store from checkpoint");
} catch (Exception expected) {
}
assertNull("audit location should be null", kaha.getMetadata().producerSequenceIdTrackerLocation);
}
}

View File

@ -181,6 +181,12 @@ public class DataFileAppenderNoSpaceNoBatchTest {
assertTrue("write complete", latch.await(5, TimeUnit.SECONDS)); assertTrue("write complete", latch.await(5, TimeUnit.SECONDS));
} }
boolean someExceptions = false;
for (Location location: locations) {
someExceptions |= (location.getBatch().exception != null);
}
assertTrue(someExceptions);
LOG.info("Latches count: " + latches.size()); LOG.info("Latches count: " + latches.size());
LOG.info("Seeks: " + seekPositions); LOG.info("Seeks: " + seekPositions);