ARTEMIS-3327 Reverting 5c051e9832 and adding test to validate

contract with sync.

This reverts commit 5c051e9832.

However this is adding two tests to make sure there won't be a regression on this.
This commit is contained in:
Clebert Suconic 2021-07-07 16:19:08 -04:00 committed by clebertsuconic
parent ff6e1572c4
commit 000f83dbc2
3 changed files with 145 additions and 0 deletions

View File

@ -935,6 +935,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize);
}
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@Override
public void run() {
@ -951,9 +952,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
", usedFile = " +
usedFile);
}
result.set(true);
} catch (ActiveMQShutdownException e) {
result.fail(e);
logger.error("appendAddRecord:" + e, e);
} catch (Throwable e) {
result.fail(e);
setErrorCondition(callback, null, e);
logger.error("appendAddRecord::" + e, e);
} finally {
@ -961,6 +965,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
});
result.get();
}

View File

@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
@ -117,6 +118,133 @@ public class JournalAsyncTest extends ActiveMQTestBase {
}
}
@Test
public void testAsyncAppendRecord1() throws Exception {
final int JOURNAL_SIZE = 20000;
setupJournal(JOURNAL_SIZE, 100, 5);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch latchHoldDirect = new CountDownLatch(1);
try {
factory.setWriteDirectCallback(() -> {
try {
latchHoldDirect.await(10, TimeUnit.MINUTES);
} catch (Throwable ignored) {
}
});
class LocalThread extends Thread {
Exception e;
@Override
public void run() {
try {
journalImpl.appendAddRecord(1, (byte) 1, new SimpleEncoding(1, (byte) 0), true, null);
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
this.e = e;
}
}
}
LocalThread t = new LocalThread();
t.start();
Assert.assertFalse("journal.append with sync true should hold until the write is done", latch.await(100, TimeUnit.MILLISECONDS));
Thread.yield();
Assert.assertTrue(t.isAlive());
latchHoldDirect.countDown();
Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
t.join();
Assert.assertFalse(t.isAlive());
if (t.e != null) {
throw t.e;
}
} finally {
latchHoldDirect.countDown();
}
}
@Test
public void testAsyncAppendRecord2() throws Exception {
final int JOURNAL_SIZE = 20000;
setupJournal(JOURNAL_SIZE, 100, 5);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch latchHoldDirect = new CountDownLatch(1);
try {
factory.setWriteDirectCallback(() -> {
try {
latchHoldDirect.await(10, TimeUnit.MINUTES);
} catch (Throwable ignored) {
}
});
class LocalThread extends Thread {
Exception e;
@Override
public void run() {
try {
journalImpl.appendAddRecord(1, (byte) 1, new SimpleEncoding(1, (byte) 0), true, new IOCompletion() {
@Override
public void storeLineUp() {
}
@Override
public void done() {
}
@Override
public void onError(int errorCode, String errorMessage) {
}
});
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
this.e = e;
}
}
}
LocalThread t = new LocalThread();
t.start();
Assert.assertTrue("journal.append with sync true and IOContext should not hold thread", latch.await(10, TimeUnit.SECONDS));
latchHoldDirect.countDown();
Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
t.join();
Assert.assertFalse(t.isAlive());
if (t.e != null) {
throw t.e;
}
} finally {
latchHoldDirect.countDown();
}
}
// If a callback error already arrived, we should just throw the exception right away
@Test
public void testPreviousError() throws Exception {

View File

@ -43,6 +43,9 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
private final boolean supportsCallback;
private Runnable writeDirectCallback;
private volatile boolean holdCallbacks;
private ListenerHoldCallback holdCallbackListener;
@ -81,6 +84,10 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
return 1;
}
public void setWriteDirectCallback(Runnable writeDirectCallback) {
this.writeDirectCallback = writeDirectCallback;
}
// Public --------------------------------------------------------
@Override
@ -420,6 +427,10 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
action.run();
}
if (writeDirectCallback != null) {
writeDirectCallback.run();
}
}
@Override