[AMQ-6771] do linear sequential scan of journal when validating checksums - remove batch reads via seek/read which depend on write batch size

This commit is contained in:
gtully 2017-07-17 12:18:25 +01:00
parent 56bed30c64
commit 8c218ee05d
8 changed files with 158 additions and 95 deletions

View File

@ -50,6 +50,8 @@ public class ByteSequence {
return offset;
}
public int remaining() { return length - offset; }
public void setData(byte[] data) {
this.data = data;
}
@ -71,8 +73,14 @@ public class ByteSequence {
}
}
public void reset() {
length = remaining();
System.arraycopy(data, offset, data, 0, length);
offset = 0;
}
public int indexOf(ByteSequence needle, int pos) {
int max = length - needle.length;
int max = length - needle.length - offset;
for (int i = pos; i < max; i++) {
if (matches(needle, i)) {
return i;
@ -102,4 +110,16 @@ public class ByteSequence {
}
return -1;
}
public boolean startsWith(final byte[] bytes) {
if (length - offset < bytes.length) {
return false;
}
for (int i = 0; i<bytes.length; i++) {
if (data[offset+i] != bytes[i]) {
return false;
}
}
return true;
}
}

View File

@ -65,6 +65,8 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
return pos - offset;
}
public int position() { return pos; }
/**
* @return the underlying data array
*/
@ -224,7 +226,7 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
}
public long readLong() throws IOException {
if (pos >= buf.length ) {
if (pos + 8 >= buf.length ) {
throw new EOFException();
}
long rc = ((long)buf[pos++] << 56) + ((long)(buf[pos++] & 255) << 48) + ((long)(buf[pos++] & 255) << 40) + ((long)(buf[pos++] & 255) << 32);

View File

@ -480,6 +480,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try {
IOHelper.mkdirs(directory);
if (deleteAllMessages) {
getJournal().setCheckForCorruptionOnStartup(false);
getJournal().start();
getJournal().delete();
getJournal().close();
@ -1048,7 +1049,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private Location getNextInitializedLocation(Location location) throws IOException {
Location mayNotBeInitialized = journal.getNextLocation(location);
if (location.getSize() == NOT_SET && mayNotBeInitialized.getSize() != NOT_SET) {
if (location.getSize() == NOT_SET && mayNotBeInitialized != null && mayNotBeInitialized.getSize() != NOT_SET) {
// need to init size and type to skip
return journal.getNextLocation(mayNotBeInitialized);
} else {

View File

@ -17,6 +17,7 @@
package org.apache.activemq.store.kahadb.disk.journal;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Map;
import org.apache.activemq.util.ByteSequence;
@ -115,38 +116,6 @@ final class DataFileAccessor {
}
}
// public boolean readLocationDetailsAndValidate(Location location) {
// try {
// WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
// if (asyncWrite != null) {
// location.setSize(asyncWrite.location.getSize());
// location.setType(asyncWrite.location.getType());
// } else {
// file.seek(location.getOffset());
// location.setSize(file.readInt());
// location.setType(file.readByte());
//
// byte data[] = new byte[3];
// file.seek(location.getOffset() + Journal.ITEM_HEAD_OFFSET_TO_SOR);
// file.readFully(data);
// if (data[0] != Journal.ITEM_HEAD_SOR[0]
// || data[1] != Journal.ITEM_HEAD_SOR[1]
// || data[2] != Journal.ITEM_HEAD_SOR[2]) {
// return false;
// }
// file.seek(location.getOffset() + location.getSize() - Journal.ITEM_FOOT_SPACE);
// file.readFully(data);
// if (data[0] != Journal.ITEM_HEAD_EOR[0]
// || data[1] != Journal.ITEM_HEAD_EOR[1]
// || data[2] != Journal.ITEM_HEAD_EOR[2]) {
// return false;
// }
// }
// } catch (IOException e) {
// return false;
// }
// return true;
// }
public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException {
@ -157,4 +126,8 @@ final class DataFileAccessor {
file.sync();
}
}
public RecoverableRandomAccessFile getRaf() {
return file;
}
}

View File

@ -26,7 +26,6 @@ import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@ -90,12 +89,21 @@ public class Journal {
// with corruption on recovery we have no faith in the content - slip to the next batch record or eof
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
int nextOffset = findNextBatchRecord(reader, recoveryPosition.getOffset() + 1);
Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset >= 0 ? nextOffset - 1 : dataFile.getLength() - 1);
RandomAccessFile randomAccessFile = reader.getRaf().getRaf();
randomAccessFile.seek(recoveryPosition.getOffset() + 1);
byte[] data = new byte[getWriteBatchSize()];
ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data));
int nextOffset = 0;
if (findNextBatchRecord(bs, randomAccessFile) >= 0) {
nextOffset = Math.toIntExact(randomAccessFile.getFilePointer() - bs.remaining());
} else {
nextOffset = Math.toIntExact(randomAccessFile.length());
}
Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset - 1);
LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence);
// skip corruption on getNextLocation
recoveryPosition.setOffset((int) sequence.getLast() + 1);
recoveryPosition.setOffset(nextOffset);
recoveryPosition.setSize(-1);
dataFile.corruptedBlocks.add(sequence);
@ -463,21 +471,19 @@ public class Journal {
}
public boolean isUnusedPreallocated(DataFile dataFile) throws IOException {
int firstBatchRecordSize = -1;
if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) {
Location location = new Location();
location.setDataFileId(dataFile.getDataFileId());
location.setOffset(0);
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
firstBatchRecordSize = checkBatchRecord(reader, location.getOffset());
byte[] firstFewBytes = new byte[BATCH_CONTROL_RECORD_HEADER.length];
reader.readFully(0, firstFewBytes);
ByteSequence bs = new ByteSequence(firstFewBytes);
return bs.startsWith(EOF_RECORD);
} catch (Exception ignored) {
} finally {
accessorPool.closeDataFileAccessor(reader);
}
}
return firstBatchRecordSize == 0;
return false;
}
protected Location recoveryCheck(DataFile dataFile) throws IOException {
@ -487,9 +493,15 @@ public class Journal {
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
RandomAccessFile randomAccessFile = reader.getRaf().getRaf();
randomAccessFile.seek(0);
final long totalFileLength = randomAccessFile.length();
byte[] data = new byte[getWriteBatchSize()];
ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data));
while (true) {
int size = checkBatchRecord(reader, location.getOffset());
if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) {
int size = checkBatchRecord(bs, randomAccessFile);
if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= totalFileLength) {
if (size == 0) {
// eof batch record
break;
@ -500,8 +512,8 @@ public class Journal {
// Perhaps it's just some corruption... scan through the
// file to find the next valid batch record. We
// may have subsequent valid batch records.
int nextOffset = findNextBatchRecord(reader, location.getOffset() + 1);
if (nextOffset >= 0) {
if (findNextBatchRecord(bs, randomAccessFile) >= 0) {
int nextOffset = Math.toIntExact(randomAccessFile.getFilePointer() - bs.remaining());
Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence);
dataFile.corruptedBlocks.add(sequence);
@ -533,41 +545,33 @@ public class Journal {
return location;
}
private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException {
ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
byte data[] = new byte[1024*4];
ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
private int findNextBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException {
final ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
int pos = 0;
while (true) {
pos = bs.indexOf(header, pos);
pos = bs.indexOf(header, 0);
if (pos >= 0) {
return offset + pos;
bs.setOffset(bs.offset + pos);
return pos;
} else {
// need to load the next data chunck in..
if (bs.length != data.length) {
if (bs.length != bs.data.length) {
// If we had a short read then we were at EOF
return -1;
}
offset += bs.length - BATCH_CONTROL_RECORD_HEADER.length;
bs = new ByteSequence(data, 0, reader.read(offset, data));
pos = 0;
bs.setOffset(bs.length - BATCH_CONTROL_RECORD_HEADER.length);
bs.reset();
bs.setLength(bs.length + reader.read(bs.data, bs.length, bs.data.length - BATCH_CONTROL_RECORD_HEADER.length));
}
}
}
public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException {
byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
private int checkBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException {
try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);) {
reader.readFully(offset, controlRecord);
// check for journal eof
if (Arrays.equals(EOF_RECORD, Arrays.copyOfRange(controlRecord, 0, EOF_RECORD.length))) {
// eof batch
return 0;
}
if (bs.startsWith(EOF_RECORD)) {
return 0; // eof
}
try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(bs)) {
// Assert that it's a batch record.
for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) {
@ -578,28 +582,43 @@ public class Journal {
int size = controlIs.readInt();
if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) {
return -1;
return -2;
}
if (isChecksum()) {
long expectedChecksum = controlIs.readLong();
Checksum checksum = null;
if (isChecksum() && expectedChecksum > 0) {
checksum = new Adler32();
}
long expectedChecksum = controlIs.readLong();
if (expectedChecksum == 0) {
// Checksuming was not enabled when the record was stored.
// we can't validate the record :(
return size;
}
// revert to bs to consume data
bs.setOffset(controlIs.position());
int toRead = size;
while (toRead > 0) {
if (bs.remaining() >= toRead) {
if (checksum != null) {
checksum.update(bs.getData(), bs.getOffset(), toRead);
}
bs.setOffset(bs.offset + toRead);
toRead = 0;
} else {
if (bs.length != bs.data.length) {
// buffer exhausted
return -3;
}
byte data[] = new byte[size];
reader.readFully(offset + BATCH_CONTROL_RECORD_SIZE, data);
Checksum checksum = new Adler32();
checksum.update(data, 0, data.length);
if (expectedChecksum != checksum.getValue()) {
return -1;
toRead -= bs.remaining();
if (checksum != null) {
checksum.update(bs.getData(), bs.getOffset(), bs.remaining());
}
bs.setLength(reader.read(bs.data));
bs.setOffset(0);
}
}
if (checksum != null && expectedChecksum != checksum.getValue()) {
return -4;
}
return size;
}
}

View File

@ -44,7 +44,7 @@ public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io.
raf = new RandomAccessFile(file, mode);
}
protected RandomAccessFile getRaf() throws IOException {
public RandomAccessFile getRaf() throws IOException {
if (raf == null) {
raf = new RandomAccessFile(file, mode);
}

View File

@ -64,10 +64,12 @@ public class JournalCorruptionEofIndexRecoveryTest {
private String connectionUri;
private KahaDBPersistenceAdapter adapter;
private boolean ignoreMissingJournalFiles = false;
private int journalMaxBatchSize;
private final Destination destination = new ActiveMQQueue("Test");
private final String KAHADB_DIRECTORY = "target/activemq-data/";
private final String payload = new String(new byte[1024]);
File brokerDataDir = null;
protected void startBroker() throws Exception {
doStartBroker(true, false);
@ -78,14 +80,13 @@ public class JournalCorruptionEofIndexRecoveryTest {
}
protected void restartBroker(boolean whackIndex, boolean forceRecoverIndex) throws Exception {
File dataDir = broker.getPersistenceAdapter().getDirectory();
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
if (whackIndex) {
File indexToDelete = new File(dataDir, "db.data");
File indexToDelete = new File(brokerDataDir, "db.data");
LOG.info("Whacking index: " + indexToDelete);
indexToDelete.delete();
}
@ -113,6 +114,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
cf = new ActiveMQConnectionFactory(connectionUri);
broker.start();
brokerDataDir = broker.getPersistenceAdapter().getDirectory();
LOG.info("Starting broker..");
}
@ -124,6 +126,8 @@ public class JournalCorruptionEofIndexRecoveryTest {
// ensure there are a bunch of data files but multiple entries in each
adapter.setJournalMaxFileLength(1024 * 20);
adapter.setJournalMaxWriteBatchSize(journalMaxBatchSize);
// speed up the test case, checkpoint an cleanup early and often
adapter.setCheckpointInterval(5000);
adapter.setCleanupInterval(5000);
@ -146,6 +150,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
@Before
public void reset() throws Exception {
ignoreMissingJournalFiles = true;
journalMaxBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
}
@Test
@ -234,6 +239,20 @@ public class JournalCorruptionEofIndexRecoveryTest {
assertEquals("Drain", numToSend, drainQueue(numToSend));
}
@Test
public void testRecoverIndexWithSmallBatch() throws Exception {
journalMaxBatchSize = 2 * 1024;
startBroker();
final int numToSend = 4;
produceMessagesToConsumeMultipleDataFiles(numToSend);
// force journal replay by whacking the index
restartBroker(false, true);
assertEquals("Drain", numToSend, drainQueue(numToSend));
}
@Test
public void testRecoveryAfterProducerAuditLocationCorrupt() throws Exception {

View File

@ -25,6 +25,7 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,6 +41,8 @@ import javax.management.Attribute;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -54,8 +57,7 @@ public class JournalFdRecoveryTest {
private static final Logger LOG = LoggerFactory.getLogger(JournalFdRecoveryTest.class);
private final String KAHADB_DIRECTORY = "target/activemq-data/";
private final String payload = new String(new byte[1024]);
private String payload;
private ActiveMQConnectionFactory cf = null;
private BrokerService broker = null;
private final Destination destination = new ActiveMQQueue("Test");
@ -63,6 +65,7 @@ public class JournalFdRecoveryTest {
private KahaDBPersistenceAdapter adapter;
public byte fill = Byte.valueOf("3");
private int maxJournalSizeBytes;
protected void startBroker() throws Exception {
doStartBroker(true);
@ -88,7 +91,6 @@ public class JournalFdRecoveryTest {
}
private void doCreateBroker(boolean delete) throws Exception {
broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(delete);
broker.setPersistent(true);
@ -112,7 +114,7 @@ public class JournalFdRecoveryTest {
adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
// ensure there are a bunch of data files but multiple entries in each
adapter.setJournalMaxFileLength(1024 * 20);
adapter.setJournalMaxFileLength(maxJournalSizeBytes);
// speed up the test case, checkpoint an cleanup early and often
adapter.setCheckpointInterval(5000);
@ -132,6 +134,12 @@ public class JournalFdRecoveryTest {
}
}
@Before
public void initPayLoad() {
payload = new String(new byte[1024]);
maxJournalSizeBytes = 1024 * 20;
}
@Test
public void testStopOnPageInIOError() throws Exception {
@ -236,6 +244,27 @@ public class JournalFdRecoveryTest {
}
@Test
public void testRecoveryCheckSpeedSmallMessages() throws Exception {
maxJournalSizeBytes = Journal.DEFAULT_MAX_FILE_LENGTH;
doCreateBroker(true);
broker.start();
int toSend = 20000;
payload = new String(new byte[100]);
produceMessagesToConsumeMultipleDataFiles(toSend);
broker.stop();
broker.waitUntilStopped();
Instant b = Instant.now();
doStartBroker(false);
Instant e = Instant.now();
Duration timeElapsed = Duration.between(b, e);
LOG.info("Elapsed: " + timeElapsed);
}
private long totalOpenFileDescriptorCount(BrokerService broker) {
long result = 0;
try {