[AMQ-6771] do linear sequential scan of journal when validating checksums - remove batch reads via seek/read which depend on write batch size

(cherry picked from commit 8c218ee05d)
This commit is contained in:
gtully 2017-07-17 12:18:25 +01:00 committed by Timothy Bish
parent 7a2c4eecb2
commit ba5e814667
8 changed files with 158 additions and 95 deletions

View File

@ -50,6 +50,8 @@ public class ByteSequence {
return offset; return offset;
} }
public int remaining() { return length - offset; }
public void setData(byte[] data) { public void setData(byte[] data) {
this.data = data; this.data = data;
} }
@ -71,8 +73,14 @@ public class ByteSequence {
} }
} }
public void reset() {
length = remaining();
System.arraycopy(data, offset, data, 0, length);
offset = 0;
}
public int indexOf(ByteSequence needle, int pos) { public int indexOf(ByteSequence needle, int pos) {
int max = length - needle.length; int max = length - needle.length - offset;
for (int i = pos; i < max; i++) { for (int i = pos; i < max; i++) {
if (matches(needle, i)) { if (matches(needle, i)) {
return i; return i;
@ -102,4 +110,16 @@ public class ByteSequence {
} }
return -1; return -1;
} }
public boolean startsWith(final byte[] bytes) {
if (length - offset < bytes.length) {
return false;
}
for (int i = 0; i<bytes.length; i++) {
if (data[offset+i] != bytes[i]) {
return false;
}
}
return true;
}
} }

View File

@ -65,6 +65,8 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
return pos - offset; return pos - offset;
} }
public int position() { return pos; }
/** /**
* @return the underlying data array * @return the underlying data array
*/ */
@ -224,7 +226,7 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
} }
public long readLong() throws IOException { public long readLong() throws IOException {
if (pos >= buf.length ) { if (pos + 8 >= buf.length ) {
throw new EOFException(); throw new EOFException();
} }
long rc = ((long)buf[pos++] << 56) + ((long)(buf[pos++] & 255) << 48) + ((long)(buf[pos++] & 255) << 40) + ((long)(buf[pos++] & 255) << 32); long rc = ((long)buf[pos++] << 56) + ((long)(buf[pos++] & 255) << 48) + ((long)(buf[pos++] & 255) << 40) + ((long)(buf[pos++] & 255) << 32);

View File

@ -480,6 +480,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try { try {
IOHelper.mkdirs(directory); IOHelper.mkdirs(directory);
if (deleteAllMessages) { if (deleteAllMessages) {
getJournal().setCheckForCorruptionOnStartup(false);
getJournal().start(); getJournal().start();
getJournal().delete(); getJournal().delete();
getJournal().close(); getJournal().close();
@ -1048,7 +1049,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private Location getNextInitializedLocation(Location location) throws IOException { private Location getNextInitializedLocation(Location location) throws IOException {
Location mayNotBeInitialized = journal.getNextLocation(location); Location mayNotBeInitialized = journal.getNextLocation(location);
if (location.getSize() == NOT_SET && mayNotBeInitialized.getSize() != NOT_SET) { if (location.getSize() == NOT_SET && mayNotBeInitialized != null && mayNotBeInitialized.getSize() != NOT_SET) {
// need to init size and type to skip // need to init size and type to skip
return journal.getNextLocation(mayNotBeInitialized); return journal.getNextLocation(mayNotBeInitialized);
} else { } else {

View File

@ -17,6 +17,7 @@
package org.apache.activemq.store.kahadb.disk.journal; package org.apache.activemq.store.kahadb.disk.journal;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Map; import java.util.Map;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
@ -115,38 +116,6 @@ final class DataFileAccessor {
} }
} }
// public boolean readLocationDetailsAndValidate(Location location) {
// try {
// WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
// if (asyncWrite != null) {
// location.setSize(asyncWrite.location.getSize());
// location.setType(asyncWrite.location.getType());
// } else {
// file.seek(location.getOffset());
// location.setSize(file.readInt());
// location.setType(file.readByte());
//
// byte data[] = new byte[3];
// file.seek(location.getOffset() + Journal.ITEM_HEAD_OFFSET_TO_SOR);
// file.readFully(data);
// if (data[0] != Journal.ITEM_HEAD_SOR[0]
// || data[1] != Journal.ITEM_HEAD_SOR[1]
// || data[2] != Journal.ITEM_HEAD_SOR[2]) {
// return false;
// }
// file.seek(location.getOffset() + location.getSize() - Journal.ITEM_FOOT_SPACE);
// file.readFully(data);
// if (data[0] != Journal.ITEM_HEAD_EOR[0]
// || data[1] != Journal.ITEM_HEAD_EOR[1]
// || data[2] != Journal.ITEM_HEAD_EOR[2]) {
// return false;
// }
// }
// } catch (IOException e) {
// return false;
// }
// return true;
// }
public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException { public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException {
@ -157,4 +126,8 @@ final class DataFileAccessor {
file.sync(); file.sync();
} }
} }
public RecoverableRandomAccessFile getRaf() {
return file;
}
} }

View File

@ -26,7 +26,6 @@ import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -90,12 +89,21 @@ public class Journal {
// with corruption on recovery we have no faith in the content - slip to the next batch record or eof // with corruption on recovery we have no faith in the content - slip to the next batch record or eof
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try { try {
int nextOffset = findNextBatchRecord(reader, recoveryPosition.getOffset() + 1); RandomAccessFile randomAccessFile = reader.getRaf().getRaf();
Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset >= 0 ? nextOffset - 1 : dataFile.getLength() - 1); randomAccessFile.seek(recoveryPosition.getOffset() + 1);
byte[] data = new byte[getWriteBatchSize()];
ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data));
int nextOffset = 0;
if (findNextBatchRecord(bs, randomAccessFile) >= 0) {
nextOffset = Math.toIntExact(randomAccessFile.getFilePointer() - bs.remaining());
} else {
nextOffset = Math.toIntExact(randomAccessFile.length());
}
Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset - 1);
LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence); LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence);
// skip corruption on getNextLocation // skip corruption on getNextLocation
recoveryPosition.setOffset((int) sequence.getLast() + 1); recoveryPosition.setOffset(nextOffset);
recoveryPosition.setSize(-1); recoveryPosition.setSize(-1);
dataFile.corruptedBlocks.add(sequence); dataFile.corruptedBlocks.add(sequence);
@ -463,21 +471,19 @@ public class Journal {
} }
public boolean isUnusedPreallocated(DataFile dataFile) throws IOException { public boolean isUnusedPreallocated(DataFile dataFile) throws IOException {
int firstBatchRecordSize = -1;
if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) { if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) {
Location location = new Location();
location.setDataFileId(dataFile.getDataFileId());
location.setOffset(0);
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try { try {
firstBatchRecordSize = checkBatchRecord(reader, location.getOffset()); byte[] firstFewBytes = new byte[BATCH_CONTROL_RECORD_HEADER.length];
reader.readFully(0, firstFewBytes);
ByteSequence bs = new ByteSequence(firstFewBytes);
return bs.startsWith(EOF_RECORD);
} catch (Exception ignored) { } catch (Exception ignored) {
} finally { } finally {
accessorPool.closeDataFileAccessor(reader); accessorPool.closeDataFileAccessor(reader);
} }
} }
return firstBatchRecordSize == 0; return false;
} }
protected Location recoveryCheck(DataFile dataFile) throws IOException { protected Location recoveryCheck(DataFile dataFile) throws IOException {
@ -487,9 +493,15 @@ public class Journal {
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try { try {
RandomAccessFile randomAccessFile = reader.getRaf().getRaf();
randomAccessFile.seek(0);
final long totalFileLength = randomAccessFile.length();
byte[] data = new byte[getWriteBatchSize()];
ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data));
while (true) { while (true) {
int size = checkBatchRecord(reader, location.getOffset()); int size = checkBatchRecord(bs, randomAccessFile);
if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) { if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= totalFileLength) {
if (size == 0) { if (size == 0) {
// eof batch record // eof batch record
break; break;
@ -500,8 +512,8 @@ public class Journal {
// Perhaps it's just some corruption... scan through the // Perhaps it's just some corruption... scan through the
// file to find the next valid batch record. We // file to find the next valid batch record. We
// may have subsequent valid batch records. // may have subsequent valid batch records.
int nextOffset = findNextBatchRecord(reader, location.getOffset() + 1); if (findNextBatchRecord(bs, randomAccessFile) >= 0) {
if (nextOffset >= 0) { int nextOffset = Math.toIntExact(randomAccessFile.getFilePointer() - bs.remaining());
Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1); Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence); LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence);
dataFile.corruptedBlocks.add(sequence); dataFile.corruptedBlocks.add(sequence);
@ -533,41 +545,33 @@ public class Journal {
return location; return location;
} }
private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException { private int findNextBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException {
ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER); final ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
byte data[] = new byte[1024*4];
ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
int pos = 0; int pos = 0;
while (true) { while (true) {
pos = bs.indexOf(header, pos); pos = bs.indexOf(header, 0);
if (pos >= 0) { if (pos >= 0) {
return offset + pos; bs.setOffset(bs.offset + pos);
return pos;
} else { } else {
// need to load the next data chunck in.. // need to load the next data chunck in..
if (bs.length != data.length) { if (bs.length != bs.data.length) {
// If we had a short read then we were at EOF // If we had a short read then we were at EOF
return -1; return -1;
} }
offset += bs.length - BATCH_CONTROL_RECORD_HEADER.length; bs.setOffset(bs.length - BATCH_CONTROL_RECORD_HEADER.length);
bs = new ByteSequence(data, 0, reader.read(offset, data)); bs.reset();
pos = 0; bs.setLength(bs.length + reader.read(bs.data, bs.length, bs.data.length - BATCH_CONTROL_RECORD_HEADER.length));
} }
} }
} }
public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException { private int checkBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException {
byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);) { if (bs.startsWith(EOF_RECORD)) {
return 0; // eof
reader.readFully(offset, controlRecord); }
try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(bs)) {
// check for journal eof
if (Arrays.equals(EOF_RECORD, Arrays.copyOfRange(controlRecord, 0, EOF_RECORD.length))) {
// eof batch
return 0;
}
// Assert that it's a batch record. // Assert that it's a batch record.
for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) { for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) {
@ -578,28 +582,43 @@ public class Journal {
int size = controlIs.readInt(); int size = controlIs.readInt();
if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) { if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) {
return -1; return -2;
} }
if (isChecksum()) { long expectedChecksum = controlIs.readLong();
Checksum checksum = null;
if (isChecksum() && expectedChecksum > 0) {
checksum = new Adler32();
}
long expectedChecksum = controlIs.readLong(); // revert to bs to consume data
if (expectedChecksum == 0) { bs.setOffset(controlIs.position());
// Checksuming was not enabled when the record was stored. int toRead = size;
// we can't validate the record :( while (toRead > 0) {
return size; if (bs.remaining() >= toRead) {
} if (checksum != null) {
checksum.update(bs.getData(), bs.getOffset(), toRead);
}
bs.setOffset(bs.offset + toRead);
toRead = 0;
} else {
if (bs.length != bs.data.length) {
// buffer exhausted
return -3;
}
byte data[] = new byte[size]; toRead -= bs.remaining();
reader.readFully(offset + BATCH_CONTROL_RECORD_SIZE, data); if (checksum != null) {
checksum.update(bs.getData(), bs.getOffset(), bs.remaining());
Checksum checksum = new Adler32(); }
checksum.update(data, 0, data.length); bs.setLength(reader.read(bs.data));
bs.setOffset(0);
if (expectedChecksum != checksum.getValue()) {
return -1;
} }
} }
if (checksum != null && expectedChecksum != checksum.getValue()) {
return -4;
}
return size; return size;
} }
} }

View File

@ -48,7 +48,7 @@ public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io.
this(new File(name), mode); this(new File(name), mode);
} }
protected RandomAccessFile getRaf() throws IOException { public RandomAccessFile getRaf() throws IOException {
if (raf == null) { if (raf == null) {
raf = new RandomAccessFile(file, mode); raf = new RandomAccessFile(file, mode);
} }

View File

@ -64,10 +64,12 @@ public class JournalCorruptionEofIndexRecoveryTest {
private String connectionUri; private String connectionUri;
private KahaDBPersistenceAdapter adapter; private KahaDBPersistenceAdapter adapter;
private boolean ignoreMissingJournalFiles = false; private boolean ignoreMissingJournalFiles = false;
private int journalMaxBatchSize;
private final Destination destination = new ActiveMQQueue("Test"); private final Destination destination = new ActiveMQQueue("Test");
private final String KAHADB_DIRECTORY = "target/activemq-data/"; private final String KAHADB_DIRECTORY = "target/activemq-data/";
private final String payload = new String(new byte[1024]); private final String payload = new String(new byte[1024]);
File brokerDataDir = null;
protected void startBroker() throws Exception { protected void startBroker() throws Exception {
doStartBroker(true, false); doStartBroker(true, false);
@ -78,14 +80,13 @@ public class JournalCorruptionEofIndexRecoveryTest {
} }
protected void restartBroker(boolean whackIndex, boolean forceRecoverIndex) throws Exception { protected void restartBroker(boolean whackIndex, boolean forceRecoverIndex) throws Exception {
File dataDir = broker.getPersistenceAdapter().getDirectory();
if (broker != null) { if (broker != null) {
broker.stop(); broker.stop();
broker.waitUntilStopped(); broker.waitUntilStopped();
} }
if (whackIndex) { if (whackIndex) {
File indexToDelete = new File(dataDir, "db.data"); File indexToDelete = new File(brokerDataDir, "db.data");
LOG.info("Whacking index: " + indexToDelete); LOG.info("Whacking index: " + indexToDelete);
indexToDelete.delete(); indexToDelete.delete();
} }
@ -113,6 +114,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
cf = new ActiveMQConnectionFactory(connectionUri); cf = new ActiveMQConnectionFactory(connectionUri);
broker.start(); broker.start();
brokerDataDir = broker.getPersistenceAdapter().getDirectory();
LOG.info("Starting broker.."); LOG.info("Starting broker..");
} }
@ -124,6 +126,8 @@ public class JournalCorruptionEofIndexRecoveryTest {
// ensure there are a bunch of data files but multiple entries in each // ensure there are a bunch of data files but multiple entries in each
adapter.setJournalMaxFileLength(1024 * 20); adapter.setJournalMaxFileLength(1024 * 20);
adapter.setJournalMaxWriteBatchSize(journalMaxBatchSize);
// speed up the test case, checkpoint an cleanup early and often // speed up the test case, checkpoint an cleanup early and often
adapter.setCheckpointInterval(5000); adapter.setCheckpointInterval(5000);
adapter.setCleanupInterval(5000); adapter.setCleanupInterval(5000);
@ -146,6 +150,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
@Before @Before
public void reset() throws Exception { public void reset() throws Exception {
ignoreMissingJournalFiles = true; ignoreMissingJournalFiles = true;
journalMaxBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
} }
@Test @Test
@ -234,6 +239,20 @@ public class JournalCorruptionEofIndexRecoveryTest {
assertEquals("Drain", numToSend, drainQueue(numToSend)); assertEquals("Drain", numToSend, drainQueue(numToSend));
} }
@Test
public void testRecoverIndexWithSmallBatch() throws Exception {
journalMaxBatchSize = 2 * 1024;
startBroker();
final int numToSend = 4;
produceMessagesToConsumeMultipleDataFiles(numToSend);
// force journal replay by whacking the index
restartBroker(false, true);
assertEquals("Drain", numToSend, drainQueue(numToSend));
}
@Test @Test
public void testRecoveryAfterProducerAuditLocationCorrupt() throws Exception { public void testRecoveryAfterProducerAuditLocationCorrupt() throws Exception {

View File

@ -25,6 +25,7 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.disk.journal.DataFile; import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.store.kahadb.disk.journal.Journal; import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.junit.After; import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -40,6 +41,8 @@ import javax.management.Attribute;
import javax.management.ObjectName; import javax.management.ObjectName;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -54,8 +57,7 @@ public class JournalFdRecoveryTest {
private static final Logger LOG = LoggerFactory.getLogger(JournalFdRecoveryTest.class); private static final Logger LOG = LoggerFactory.getLogger(JournalFdRecoveryTest.class);
private final String KAHADB_DIRECTORY = "target/activemq-data/"; private final String KAHADB_DIRECTORY = "target/activemq-data/";
private final String payload = new String(new byte[1024]); private String payload;
private ActiveMQConnectionFactory cf = null; private ActiveMQConnectionFactory cf = null;
private BrokerService broker = null; private BrokerService broker = null;
private final Destination destination = new ActiveMQQueue("Test"); private final Destination destination = new ActiveMQQueue("Test");
@ -63,6 +65,7 @@ public class JournalFdRecoveryTest {
private KahaDBPersistenceAdapter adapter; private KahaDBPersistenceAdapter adapter;
public byte fill = Byte.valueOf("3"); public byte fill = Byte.valueOf("3");
private int maxJournalSizeBytes;
protected void startBroker() throws Exception { protected void startBroker() throws Exception {
doStartBroker(true); doStartBroker(true);
@ -88,7 +91,6 @@ public class JournalFdRecoveryTest {
} }
private void doCreateBroker(boolean delete) throws Exception { private void doCreateBroker(boolean delete) throws Exception {
broker = new BrokerService(); broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(delete); broker.setDeleteAllMessagesOnStartup(delete);
broker.setPersistent(true); broker.setPersistent(true);
@ -112,7 +114,7 @@ public class JournalFdRecoveryTest {
adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
// ensure there are a bunch of data files but multiple entries in each // ensure there are a bunch of data files but multiple entries in each
adapter.setJournalMaxFileLength(1024 * 20); adapter.setJournalMaxFileLength(maxJournalSizeBytes);
// speed up the test case, checkpoint an cleanup early and often // speed up the test case, checkpoint an cleanup early and often
adapter.setCheckpointInterval(5000); adapter.setCheckpointInterval(5000);
@ -132,6 +134,12 @@ public class JournalFdRecoveryTest {
} }
} }
@Before
public void initPayLoad() {
payload = new String(new byte[1024]);
maxJournalSizeBytes = 1024 * 20;
}
@Test @Test
public void testStopOnPageInIOError() throws Exception { public void testStopOnPageInIOError() throws Exception {
@ -236,6 +244,27 @@ public class JournalFdRecoveryTest {
} }
@Test
public void testRecoveryCheckSpeedSmallMessages() throws Exception {
maxJournalSizeBytes = Journal.DEFAULT_MAX_FILE_LENGTH;
doCreateBroker(true);
broker.start();
int toSend = 20000;
payload = new String(new byte[100]);
produceMessagesToConsumeMultipleDataFiles(toSend);
broker.stop();
broker.waitUntilStopped();
Instant b = Instant.now();
doStartBroker(false);
Instant e = Instant.now();
Duration timeElapsed = Duration.between(b, e);
LOG.info("Elapsed: " + timeElapsed);
}
private long totalOpenFileDescriptorCount(BrokerService broker) { private long totalOpenFileDescriptorCount(BrokerService broker) {
long result = 0; long result = 0;
try { try {