NO-JIRA Adding a Compacting Test

I was debugging Compacting, looking for a possible issue here in these conditions.
even though I found nothing wrong with the code, I still want to keep the test as there's no such thing as enough testing.
This commit is contained in:
Clebert Suconic 2022-09-26 09:29:06 -04:00
parent 3d98ebc262
commit 1a8c458906

View File

@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -38,6 +39,7 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncoderPersister;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.AbstractJournalUpdateTask;
@ -285,6 +287,166 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
} finally {
AssertionLoggerHandler.stopCapture();
}
}
@Test
public void testSimultaneousDeleteUpdate() throws Exception {
setup(2, 60 * 1024, false);
ExecutorService executorService = Executors.newFixedThreadPool(3);
runAfter(executorService::shutdownNow);
AtomicBoolean running = new AtomicBoolean(true);
final byte recordType = (byte) 0;
journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO);
journal.start();
journal.loadInternalOnly();
CountDownLatch latchDone = new CountDownLatch(3);
AtomicBoolean failed = new AtomicBoolean(false);
CyclicBarrier barrier = new CyclicBarrier(3);
executorService.execute(() -> {
int numberCompact = 0;
while (running.get()) {
try {
if (barrier.await(1, TimeUnit.SECONDS) == -1) {
continue;
}
} catch (Throwable ignored) {
continue;
}
numberCompact++;
if (numberCompact % 10 == 0) {
logger.debugf("Compact %s", numberCompact);
}
journal.testCompact();
}
latchDone.countDown();
});
executorService.execute(() -> {
while (running.get()) {
long tx = idGenerator.generateID();
long record = idGenerator.generateID();
try {
journal.appendAddRecordTransactional(tx, record, recordType, "test".getBytes());
journal.appendCommitRecord(tx, false);
journal.appendDeleteRecord(record, true);
try {
journal.appendDeleteRecord(idGenerator.generateID(), false);
} catch (Exception ignored) {
}
try {
if (barrier.await(1, TimeUnit.SECONDS) == -1) {
continue;
}
} catch (Exception ignored) {
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
failed.set(true);
}
}
latchDone.countDown();
});
executorService.execute(() -> {
while (running.get()) {
long tx = idGenerator.generateID();
long record = idGenerator.generateID();
try {
journal.appendAddRecordTransactional(tx, record, recordType, "test".getBytes());
journal.appendRollbackRecord(tx, true);
} catch (Exception e) {
e.printStackTrace();
failed.set(true);
}
}
latchDone.countDown();
});
long recordId = idGenerator.generateID();
for (int i = 0; i < 100; i++) {
if (i % 10 == 0) {
logger.debugf("i = %s", i);
}
recordId = idGenerator.generateID();
journal.appendAddRecord(recordId, recordType, "test".getBytes(), false);
journal.appendUpdateRecord(recordId, recordType, "update".getBytes(), false);
long tx = idGenerator.generateID();
journal.appendUpdateRecordTransactional(tx, recordId, recordType, "test".getBytes());
journal.appendRollbackRecord(tx, true);
journal.appendDeleteRecord(recordId, false);
try {
barrier.await(1, TimeUnit.SECONDS); // we will wait a little bit, if compacting still busy we carry on...
} catch (Throwable ignored) {
}
}
journal.appendAddRecord(idGenerator.generateID(), recordType, "test".getBytes(), true);
running.set(false);
Assert.assertTrue(latchDone.await(10, TimeUnit.SECONDS));
recordId = idGenerator.generateID();
journal.appendAddRecord(recordId, recordType, "test".getBytes(), true);
journal.testCompact();
journal.stop();
AtomicInteger count = new AtomicInteger(0);
journal.start();
ArrayList<RecordInfo> list = new ArrayList<>();
journal.load(new LoaderCallback() {
@Override
public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction) {
logger.warn("Not supposed to have prep TX");
failed.set(true);
}
@Override
public void addRecord(RecordInfo info) {
count.incrementAndGet();
}
@Override
public void deleteRecord(long id) {
logger.warn("Not supposed to delete");
failed.set(true);
}
@Override
public void updateRecord(RecordInfo info) {
logger.warn("Not supposed to update");
}
@Override
public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete) {
logger.warn("Not supposed to failed TX");
}
});
Assert.assertEquals(2, count.get());
Assert.assertFalse(failed.get());
}
@ -561,19 +723,19 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
@Override
public void onCompactDone() {
latchDone.countDown();
instanceLog.debug("Waiting on Compact");
logger.debug("Waiting on Compact");
try {
ActiveMQTestBase.waitForLatch(latchWait);
} catch (InterruptedException e) {
e.printStackTrace();
}
instanceLog.debug("Done");
logger.debug("Done");
}
};
AtomicInteger criticalErrors = new AtomicInteger(0);
journal.setCriticalErrorListener((a, b, c) -> {
System.out.println("Error");
logger.warn("Error", a);
criticalErrors.incrementAndGet();
});
@ -688,17 +850,17 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
int count = 0;
for (long liveID : liveIDs) {
if (count++ % 2 == 0) {
instanceLog.debug("Deleting no trans " + liveID);
logger.debug("Deleting no trans " + liveID);
delete(liveID);
} else {
instanceLog.debug("Deleting TX " + liveID);
logger.debug("Deleting TX " + liveID);
// A Total new transaction (that was created after the compact started) to delete a record that is being
// compacted
deleteTx(transactionID, liveID);
commit(transactionID++);
}
instanceLog.debug("Deletes are going into " + ((JournalImpl) journal).getCurrentFile());
logger.debug("Deletes are going into " + ((JournalImpl) journal).getCurrentFile());
}
}
@ -833,7 +995,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
journal.testCompact();
instanceLog.debug("Debug after compact\n" + journal.debug());
logger.debug("Debug after compact\n" + journal.debug());
stopJournal();
createJournal();
@ -1273,11 +1435,11 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
journal.forceMoveNextFile();
instanceLog.debug("Number of Files: " + journal.getDataFilesCount());
logger.debug("Number of Files: " + journal.getDataFilesCount());
instanceLog.debug("Before compact ****************************");
instanceLog.debug(journal.debug());
instanceLog.debug("*****************************************");
logger.debug("Before compact ****************************");
logger.debug(journal.debug());
logger.debug("*****************************************");
journal.testCompact();
@ -1548,7 +1710,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
journal.forceMoveNextFile();
startCompact();
instanceLog.debug("Committing TX " + tx);
logger.debug("Committing TX " + tx);
commit(tx);
finishCompact();
@ -1598,7 +1760,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
journal.forceMoveNextFile();
startCompact();
instanceLog.debug("Committing TX " + tx1);
logger.debug("Committing TX " + tx1);
rollback(tx0);
for (int i = 0; i < 10; i++) {
addTx(tx1, ids[i]);
@ -1654,7 +1816,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
journal.forceMoveNextFile();
startCompact();
instanceLog.debug("Committing TX " + tx1);
logger.debug("Committing TX " + tx1);
rollback(tx0);
for (int i = 0; i < 10; i++) {
addTx(tx1, ids[i]);
@ -1922,7 +2084,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
try {
while (running.get()) {
Thread.sleep(500);
instanceLog.debug("Compacting");
logger.debug("Compacting");
((JournalImpl) storage.getMessageJournal()).testCompact();
((JournalImpl) storage.getMessageJournal()).checkReclaimStatus();
}