ARTEMIS-3555 Invalid data could interrupt compacting and shutdown server

This commit is contained in:
Clebert Suconic 2021-11-04 20:56:06 -04:00 committed by clebertsuconic
parent 98a6e42a57
commit 067247178f
2 changed files with 67 additions and 5 deletions

View File

@ -26,6 +26,8 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.StampedLock; import java.util.concurrent.locks.StampedLock;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.utils.Preconditions.checkArgument; import static org.apache.activemq.artemis.utils.Preconditions.checkArgument;
/** /**
@ -38,6 +40,8 @@ import static org.apache.activemq.artemis.utils.Preconditions.checkArgument;
*/ */
public class ConcurrentLongHashSet { public class ConcurrentLongHashSet {
private static final Logger logger = Logger.getLogger(ConcurrentLongHashSet.class);
private static final long EmptyItem = -1L; private static final long EmptyItem = -1L;
private static final long DeletedItem = -2L; private static final long DeletedItem = -2L;
@ -116,13 +120,17 @@ public class ConcurrentLongHashSet {
} }
public boolean contains(long item) { public boolean contains(long item) {
checkBiggerEqualZero(item); if (!moreThanZero(item)) {
return false;
}
long h = hash(item); long h = hash(item);
return getSection(h).contains(item, (int) h); return getSection(h).contains(item, (int) h);
} }
public boolean add(long item) { public boolean add(long item) {
checkBiggerEqualZero(item); if (!moreThanZero(item)) {
return false;
}
long h = hash(item); long h = hash(item);
return getSection(h).add(item, (int) h); return getSection(h).add(item, (int) h);
} }
@ -134,7 +142,9 @@ public class ConcurrentLongHashSet {
* @return true if removed or false if item was not present * @return true if removed or false if item was not present
*/ */
public boolean remove(long item) { public boolean remove(long item) {
checkBiggerEqualZero(item); if (!moreThanZero(item)) {
return false;
}
long h = hash(item); long h = hash(item);
return getSection(h).remove(item, (int) h); return getSection(h).remove(item, (int) h);
} }
@ -423,9 +433,12 @@ public class ConcurrentLongHashSet {
return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1)); return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
} }
static void checkBiggerEqualZero(long n) { static boolean moreThanZero(long n) {
if (n < 0L) { if (n < 0L) {
throw new IllegalArgumentException("Keys and values must be >= 0"); logger.warn("Keys and values must be >= 0, while it was " + n + ", entry will be ignored", new Exception("invalid record " + n));
return false;
} else {
return true;
} }
} }
} }

View File

@ -49,6 +49,7 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncodin
import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestBase; import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestBase;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEncoding; import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEncoding;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -241,6 +242,54 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
} }
@Test
public void testInvalidDataCompact() throws Exception {
setup(2, 60 * 1024, false);
final byte recordType = (byte) 0;
journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO);
AtomicBoolean failed = new AtomicBoolean(false);
journal.setCriticalErrorListener((a, b, c) -> {
new Exception("failed").printStackTrace();
failed.set(true);
});
journal.start();
journal.loadInternalOnly();
journal.appendAddRecord(1, recordType, "test".getBytes(), true);
journal.appendUpdateRecord(1, recordType, "update".getBytes(), true);
// this is simulating a bug on the journal usage
// compacting should just ignore the record
journal.appendUpdateRecordTransactional(3, -1, recordType, "upd".getBytes());
journal.appendPrepareRecord(3, "data".getBytes(), true);
journal.appendUpdateRecordTransactional(4, -1, recordType, "upd".getBytes());
journal.appendCommitRecord(3, true);
try {
AssertionLoggerHandler.startCapture();
journal.testCompact();
Assert.assertTrue(AssertionLoggerHandler.findText("must be"));
Assert.assertFalse(failed.get());
AssertionLoggerHandler.clear();
journal.testCompact();
Assert.assertFalse(AssertionLoggerHandler.findText("must be")); // the invalid records should be cleared during a compact
} finally {
AssertionLoggerHandler.stopCapture();
}
}
@Test @Test
public void testCompactPrepareRestart() throws Exception { public void testCompactPrepareRestart() throws Exception {
setup(2, 60 * 1024, false); setup(2, 60 * 1024, false);