diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ByteSequence.java b/activemq-client/src/main/java/org/apache/activemq/util/ByteSequence.java index 2699856069..ac1e01ae61 100644 --- a/activemq-client/src/main/java/org/apache/activemq/util/ByteSequence.java +++ b/activemq-client/src/main/java/org/apache/activemq/util/ByteSequence.java @@ -50,6 +50,8 @@ public class ByteSequence { return offset; } + public int remaining() { return length - offset; } + public void setData(byte[] data) { this.data = data; } @@ -71,8 +73,14 @@ public class ByteSequence { } } + public void reset() { + length = remaining(); + System.arraycopy(data, offset, data, 0, length); + offset = 0; + } + public int indexOf(ByteSequence needle, int pos) { - int max = length - needle.length; + int max = length - needle.length - offset; for (int i = pos; i < max; i++) { if (matches(needle, i)) { return i; @@ -102,4 +110,16 @@ public class ByteSequence { } return -1; } + + public boolean startsWith(final byte[] bytes) { + if (length - offset < bytes.length) { + return false; + } + for (int i = 0; i= buf.length ) { + if (pos + 8 >= buf.length ) { throw new EOFException(); } long rc = ((long)buf[pos++] << 56) + ((long)(buf[pos++] & 255) << 48) + ((long)(buf[pos++] & 255) << 40) + ((long)(buf[pos++] & 255) << 32); diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 3321b35e48..40e8f955e1 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -480,6 +480,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe try { IOHelper.mkdirs(directory); if (deleteAllMessages) { + getJournal().setCheckForCorruptionOnStartup(false); getJournal().start(); getJournal().delete(); getJournal().close(); @@ -1048,7 +1049,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe private Location getNextInitializedLocation(Location location) throws IOException { Location mayNotBeInitialized = journal.getNextLocation(location); - if (location.getSize() == NOT_SET && mayNotBeInitialized.getSize() != NOT_SET) { + if (location.getSize() == NOT_SET && mayNotBeInitialized != null && mayNotBeInitialized.getSize() != NOT_SET) { // need to init size and type to skip return journal.getNextLocation(mayNotBeInitialized); } else { diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java index 71c2195c78..548d3b14cb 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java @@ -17,6 +17,7 @@ package org.apache.activemq.store.kahadb.disk.journal; import java.io.IOException; +import java.io.RandomAccessFile; import java.util.Map; import org.apache.activemq.util.ByteSequence; @@ -115,38 +116,6 @@ final class DataFileAccessor { } } -// public boolean readLocationDetailsAndValidate(Location location) { -// try { -// WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location)); -// if (asyncWrite != null) { -// location.setSize(asyncWrite.location.getSize()); -// location.setType(asyncWrite.location.getType()); -// } else { -// file.seek(location.getOffset()); -// location.setSize(file.readInt()); -// location.setType(file.readByte()); -// -// byte data[] = new byte[3]; -// file.seek(location.getOffset() + Journal.ITEM_HEAD_OFFSET_TO_SOR); -// file.readFully(data); -// if (data[0] != Journal.ITEM_HEAD_SOR[0] -// || data[1] != Journal.ITEM_HEAD_SOR[1] -// || data[2] != Journal.ITEM_HEAD_SOR[2]) { -// return false; -// } -// file.seek(location.getOffset() + location.getSize() - Journal.ITEM_FOOT_SPACE); -// file.readFully(data); -// if (data[0] != Journal.ITEM_HEAD_EOR[0] -// || data[1] != Journal.ITEM_HEAD_EOR[1] -// || data[2] != Journal.ITEM_HEAD_EOR[2]) { -// return false; -// } -// } -// } catch (IOException e) { -// return false; -// } -// return true; -// } public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException { @@ -157,4 +126,8 @@ final class DataFileAccessor { file.sync(); } } + + public RecoverableRandomAccessFile getRaf() { + return file; + } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java index a78cc65bf8..5edee92f91 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java @@ -26,7 +26,6 @@ import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; import java.nio.channels.FileChannel; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -90,12 +89,21 @@ public class Journal { // with corruption on recovery we have no faith in the content - slip to the next batch record or eof DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); try { - int nextOffset = findNextBatchRecord(reader, recoveryPosition.getOffset() + 1); - Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset >= 0 ? nextOffset - 1 : dataFile.getLength() - 1); + RandomAccessFile randomAccessFile = reader.getRaf().getRaf(); + randomAccessFile.seek(recoveryPosition.getOffset() + 1); + byte[] data = new byte[getWriteBatchSize()]; + ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data)); + int nextOffset = 0; + if (findNextBatchRecord(bs, randomAccessFile) >= 0) { + nextOffset = Math.toIntExact(randomAccessFile.getFilePointer() - bs.remaining()); + } else { + nextOffset = Math.toIntExact(randomAccessFile.length()); + } + Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset - 1); LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence); // skip corruption on getNextLocation - recoveryPosition.setOffset((int) sequence.getLast() + 1); + recoveryPosition.setOffset(nextOffset); recoveryPosition.setSize(-1); dataFile.corruptedBlocks.add(sequence); @@ -463,21 +471,19 @@ public class Journal { } public boolean isUnusedPreallocated(DataFile dataFile) throws IOException { - int firstBatchRecordSize = -1; if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) { - Location location = new Location(); - location.setDataFileId(dataFile.getDataFileId()); - location.setOffset(0); - DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); try { - firstBatchRecordSize = checkBatchRecord(reader, location.getOffset()); + byte[] firstFewBytes = new byte[BATCH_CONTROL_RECORD_HEADER.length]; + reader.readFully(0, firstFewBytes); + ByteSequence bs = new ByteSequence(firstFewBytes); + return bs.startsWith(EOF_RECORD); } catch (Exception ignored) { } finally { accessorPool.closeDataFileAccessor(reader); } } - return firstBatchRecordSize == 0; + return false; } protected Location recoveryCheck(DataFile dataFile) throws IOException { @@ -487,9 +493,15 @@ public class Journal { DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); try { + RandomAccessFile randomAccessFile = reader.getRaf().getRaf(); + randomAccessFile.seek(0); + final long totalFileLength = randomAccessFile.length(); + byte[] data = new byte[getWriteBatchSize()]; + ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data)); + while (true) { - int size = checkBatchRecord(reader, location.getOffset()); - if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) { + int size = checkBatchRecord(bs, randomAccessFile); + if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= totalFileLength) { if (size == 0) { // eof batch record break; @@ -500,8 +512,8 @@ public class Journal { // Perhaps it's just some corruption... scan through the // file to find the next valid batch record. We // may have subsequent valid batch records. - int nextOffset = findNextBatchRecord(reader, location.getOffset() + 1); - if (nextOffset >= 0) { + if (findNextBatchRecord(bs, randomAccessFile) >= 0) { + int nextOffset = Math.toIntExact(randomAccessFile.getFilePointer() - bs.remaining()); Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1); LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence); dataFile.corruptedBlocks.add(sequence); @@ -533,41 +545,33 @@ public class Journal { return location; } - private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException { - ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER); - byte data[] = new byte[1024*4]; - ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data)); - + private int findNextBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException { + final ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER); int pos = 0; while (true) { - pos = bs.indexOf(header, pos); + pos = bs.indexOf(header, 0); if (pos >= 0) { - return offset + pos; + bs.setOffset(bs.offset + pos); + return pos; } else { // need to load the next data chunck in.. - if (bs.length != data.length) { + if (bs.length != bs.data.length) { // If we had a short read then we were at EOF return -1; } - offset += bs.length - BATCH_CONTROL_RECORD_HEADER.length; - bs = new ByteSequence(data, 0, reader.read(offset, data)); - pos = 0; + bs.setOffset(bs.length - BATCH_CONTROL_RECORD_HEADER.length); + bs.reset(); + bs.setLength(bs.length + reader.read(bs.data, bs.length, bs.data.length - BATCH_CONTROL_RECORD_HEADER.length)); } } } - public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException { - byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE]; + private int checkBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException { - try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);) { - - reader.readFully(offset, controlRecord); - - // check for journal eof - if (Arrays.equals(EOF_RECORD, Arrays.copyOfRange(controlRecord, 0, EOF_RECORD.length))) { - // eof batch - return 0; - } + if (bs.startsWith(EOF_RECORD)) { + return 0; // eof + } + try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(bs)) { // Assert that it's a batch record. for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) { @@ -578,28 +582,43 @@ public class Journal { int size = controlIs.readInt(); if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) { - return -1; + return -2; } - if (isChecksum()) { + long expectedChecksum = controlIs.readLong(); + Checksum checksum = null; + if (isChecksum() && expectedChecksum > 0) { + checksum = new Adler32(); + } - long expectedChecksum = controlIs.readLong(); - if (expectedChecksum == 0) { - // Checksuming was not enabled when the record was stored. - // we can't validate the record :( - return size; - } + // revert to bs to consume data + bs.setOffset(controlIs.position()); + int toRead = size; + while (toRead > 0) { + if (bs.remaining() >= toRead) { + if (checksum != null) { + checksum.update(bs.getData(), bs.getOffset(), toRead); + } + bs.setOffset(bs.offset + toRead); + toRead = 0; + } else { + if (bs.length != bs.data.length) { + // buffer exhausted + return -3; + } - byte data[] = new byte[size]; - reader.readFully(offset + BATCH_CONTROL_RECORD_SIZE, data); - - Checksum checksum = new Adler32(); - checksum.update(data, 0, data.length); - - if (expectedChecksum != checksum.getValue()) { - return -1; + toRead -= bs.remaining(); + if (checksum != null) { + checksum.update(bs.getData(), bs.getOffset(), bs.remaining()); + } + bs.setLength(reader.read(bs.data)); + bs.setOffset(0); } } + if (checksum != null && expectedChecksum != checksum.getValue()) { + return -4; + } + return size; } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java index f9e6a4512c..309272a870 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java @@ -48,7 +48,7 @@ public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io. this(new File(name), mode); } - protected RandomAccessFile getRaf() throws IOException { + public RandomAccessFile getRaf() throws IOException { if (raf == null) { raf = new RandomAccessFile(file, mode); } diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java index 221b0875ec..16598ea7e5 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java @@ -64,10 +64,12 @@ public class JournalCorruptionEofIndexRecoveryTest { private String connectionUri; private KahaDBPersistenceAdapter adapter; private boolean ignoreMissingJournalFiles = false; + private int journalMaxBatchSize; private final Destination destination = new ActiveMQQueue("Test"); private final String KAHADB_DIRECTORY = "target/activemq-data/"; private final String payload = new String(new byte[1024]); + File brokerDataDir = null; protected void startBroker() throws Exception { doStartBroker(true, false); @@ -78,14 +80,13 @@ public class JournalCorruptionEofIndexRecoveryTest { } protected void restartBroker(boolean whackIndex, boolean forceRecoverIndex) throws Exception { - File dataDir = broker.getPersistenceAdapter().getDirectory(); if (broker != null) { broker.stop(); broker.waitUntilStopped(); } if (whackIndex) { - File indexToDelete = new File(dataDir, "db.data"); + File indexToDelete = new File(brokerDataDir, "db.data"); LOG.info("Whacking index: " + indexToDelete); indexToDelete.delete(); } @@ -113,6 +114,7 @@ public class JournalCorruptionEofIndexRecoveryTest { cf = new ActiveMQConnectionFactory(connectionUri); broker.start(); + brokerDataDir = broker.getPersistenceAdapter().getDirectory(); LOG.info("Starting broker.."); } @@ -124,6 +126,8 @@ public class JournalCorruptionEofIndexRecoveryTest { // ensure there are a bunch of data files but multiple entries in each adapter.setJournalMaxFileLength(1024 * 20); + adapter.setJournalMaxWriteBatchSize(journalMaxBatchSize); + // speed up the test case, checkpoint an cleanup early and often adapter.setCheckpointInterval(5000); adapter.setCleanupInterval(5000); @@ -146,6 +150,7 @@ public class JournalCorruptionEofIndexRecoveryTest { @Before public void reset() throws Exception { ignoreMissingJournalFiles = true; + journalMaxBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; } @Test @@ -234,6 +239,20 @@ public class JournalCorruptionEofIndexRecoveryTest { assertEquals("Drain", numToSend, drainQueue(numToSend)); } + @Test + public void testRecoverIndexWithSmallBatch() throws Exception { + journalMaxBatchSize = 2 * 1024; + startBroker(); + + final int numToSend = 4; + produceMessagesToConsumeMultipleDataFiles(numToSend); + + // force journal replay by whacking the index + restartBroker(false, true); + + assertEquals("Drain", numToSend, drainQueue(numToSend)); + } + @Test public void testRecoveryAfterProducerAuditLocationCorrupt() throws Exception { diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java index 633ab5c8da..ffe8ab6b7f 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java @@ -25,6 +25,7 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.store.kahadb.disk.journal.DataFile; import org.apache.activemq.store.kahadb.disk.journal.Journal; import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,8 @@ import javax.management.Attribute; import javax.management.ObjectName; import java.io.File; import java.io.IOException; +import java.time.Duration; +import java.time.Instant; import java.util.Collection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -54,8 +57,7 @@ public class JournalFdRecoveryTest { private static final Logger LOG = LoggerFactory.getLogger(JournalFdRecoveryTest.class); private final String KAHADB_DIRECTORY = "target/activemq-data/"; - private final String payload = new String(new byte[1024]); - + private String payload; private ActiveMQConnectionFactory cf = null; private BrokerService broker = null; private final Destination destination = new ActiveMQQueue("Test"); @@ -63,6 +65,7 @@ public class JournalFdRecoveryTest { private KahaDBPersistenceAdapter adapter; public byte fill = Byte.valueOf("3"); + private int maxJournalSizeBytes; protected void startBroker() throws Exception { doStartBroker(true); @@ -88,7 +91,6 @@ public class JournalFdRecoveryTest { } private void doCreateBroker(boolean delete) throws Exception { - broker = new BrokerService(); broker.setDeleteAllMessagesOnStartup(delete); broker.setPersistent(true); @@ -112,7 +114,7 @@ public class JournalFdRecoveryTest { adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); // ensure there are a bunch of data files but multiple entries in each - adapter.setJournalMaxFileLength(1024 * 20); + adapter.setJournalMaxFileLength(maxJournalSizeBytes); // speed up the test case, checkpoint an cleanup early and often adapter.setCheckpointInterval(5000); @@ -132,6 +134,12 @@ public class JournalFdRecoveryTest { } } + @Before + public void initPayLoad() { + payload = new String(new byte[1024]); + maxJournalSizeBytes = 1024 * 20; + } + @Test public void testStopOnPageInIOError() throws Exception { @@ -236,6 +244,27 @@ public class JournalFdRecoveryTest { } + @Test + public void testRecoveryCheckSpeedSmallMessages() throws Exception { + maxJournalSizeBytes = Journal.DEFAULT_MAX_FILE_LENGTH; + doCreateBroker(true); + broker.start(); + + int toSend = 20000; + payload = new String(new byte[100]); + produceMessagesToConsumeMultipleDataFiles(toSend); + + broker.stop(); + broker.waitUntilStopped(); + + Instant b = Instant.now(); + doStartBroker(false); + Instant e = Instant.now(); + + Duration timeElapsed = Duration.between(b, e); + LOG.info("Elapsed: " + timeElapsed); + } + private long totalOpenFileDescriptorCount(BrokerService broker) { long result = 0; try {