[AMQ-6831, AMQ-6771] fix up recovery check to ensure full batch is available in memory, regression from AMQ-6771

This commit is contained in:
gtully 2017-10-10 17:24:09 +01:00
parent 0e6fc19cff
commit f989992278
2 changed files with 45 additions and 1 deletions

View File

@ -567,10 +567,11 @@ public class Journal {
}
private int checkBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException {
ensureAvailable(bs, reader, EOF_RECORD.length);
if (bs.startsWith(EOF_RECORD)) {
return 0; // eof
}
ensureAvailable(bs, reader, BATCH_CONTROL_RECORD_SIZE);
try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(bs)) {
// Assert that it's a batch record.
@ -623,6 +624,13 @@ public class Journal {
}
}
private void ensureAvailable(ByteSequence bs, RandomAccessFile reader, int required) throws IOException {
if (bs.remaining() < required) {
bs.reset();
bs.setLength(bs.length + reader.read(bs.data, bs.length, bs.data.length - bs.length));
}
}
void addToTotalLength(int size) {
totalLength.addAndGet(size);
}

View File

@ -17,8 +17,10 @@
package org.apache.activemq.store.kahadb;
import org.apache.activemq.ActiveMQMessageAuditNoSync;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.util.ByteSequence;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@ -28,6 +30,7 @@ import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.activemq.store.kahadb.disk.journal.Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
import static org.junit.Assert.*;
public class MessageDatabaseTest {
@ -78,4 +81,37 @@ public class MessageDatabaseTest {
assertNull("audit location should be null", kaha.getMetadata().producerSequenceIdTrackerLocation);
}
@Test
public void testRecoverCheckOnBatchBoundary() throws Exception {
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(new File(temporaryFolder.getRoot(), "kaha2"));
kaha.setCheckpointInterval(0l); // disable periodic checkpoint
kaha.setCheckForCorruptJournalFiles(true);
kaha.setChecksumJournalFiles(true);
kaha.setMaxFailoverProducersToTrack(10);
kaha.setBrokerService(new BrokerService() {
public void handleIOException(IOException exception) {
exception.printStackTrace();
}
});
kaha.start();
// track original metadata reference to ensure it is read from the journal on recovery
ActiveMQMessageAuditNoSync auditToVerify = kaha.getMetadata().producerSequenceIdTracker;
final String messsageId = "1:1:1:1";
auditToVerify.isDuplicate(messsageId);
ByteSequence byteSequence = new ByteSequence(new byte[DEFAULT_MAX_WRITE_BATCH_SIZE - 110]);
kaha.getJournal().write(byteSequence, false);
kaha.getJournal().write(byteSequence, false);
kaha.stop();
try {
kaha.start();
assertTrue("Value from journal recovered ok", kaha.getMetadata().producerSequenceIdTracker.isDuplicate(messsageId));
} finally {
kaha.stop();
}
}
}