ARTEMIS-3311 - ensure visibility of error state on operation context callback registration, fix and test

This commit is contained in:
gtully 2021-05-20 21:03:01 +01:00 committed by Gary Tully
parent 23a28dc1d8
commit 72c9cae8e1
2 changed files with 112 additions and 48 deletions

View File

@ -130,63 +130,62 @@ public class OperationContextImpl implements OperationContext {
@Override
public void executeOnCompletion(final IOCallback completion, final boolean storeOnly) {
if (errorCode != -1) {
completion.onError(errorCode, errorMessage);
return;
}
boolean executeNow = false;
synchronized (this) {
final int UNDEFINED = Integer.MIN_VALUE;
int storeLined = UNDEFINED;
int pageLined = UNDEFINED;
int replicationLined = UNDEFINED;
if (storeOnly) {
if (storeOnlyTasks == null) {
storeOnlyTasks = new LinkedList<>();
}
} else {
if (tasks == null) {
tasks = new LinkedList<>();
minimalReplicated = (replicationLined = replicationLineUp.intValue());
minimalStore = (storeLined = storeLineUp.intValue());
minimalPage = (pageLined = pageLineUp.intValue());
}
}
//On the next branches each of them is been used
if (replicationLined == UNDEFINED) {
replicationLined = replicationLineUp.intValue();
storeLined = storeLineUp.intValue();
pageLined = pageLineUp.intValue();
}
// On this case, we can just execute the context directly
if (replicationLined == replicated && storeLined == stored && pageLined == paged) {
// We want to avoid the executor if everything is complete...
// However, we can't execute the context if there are executions pending
// We need to use the executor on this case
if (executorsPending.get() == 0) {
// No need to use an executor here or a context switch
// there are no actions pending.. hence we can just execute the task directly on the same thread
executeNow = true;
} else {
execute(completion);
}
} else {
if (errorCode == -1) {
final int UNDEFINED = Integer.MIN_VALUE;
int storeLined = UNDEFINED;
int pageLined = UNDEFINED;
int replicationLined = UNDEFINED;
if (storeOnly) {
assert !storeOnlyTasks.isEmpty() ? storeOnlyTasks.peekLast().storeLined <= storeLined : true;
storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, storeLined));
if (storeOnlyTasks == null) {
storeOnlyTasks = new LinkedList<>();
}
} else {
// ensure total ordering
assert validateTasksAdd(storeLined, replicationLined, pageLined);
tasks.add(new TaskHolder(completion, storeLined, replicationLined, pageLined));
if (tasks == null) {
tasks = new LinkedList<>();
minimalReplicated = (replicationLined = replicationLineUp.intValue());
minimalStore = (storeLined = storeLineUp.intValue());
minimalPage = (pageLined = pageLineUp.intValue());
}
}
//On the next branches each of them is been used
if (replicationLined == UNDEFINED) {
replicationLined = replicationLineUp.intValue();
storeLined = storeLineUp.intValue();
pageLined = pageLineUp.intValue();
}
// On this case, we can just execute the context directly
if (replicationLined == replicated && storeLined == stored && pageLined == paged) {
// We want to avoid the executor if everything is complete...
// However, we can't execute the context if there are executions pending
// We need to use the executor on this case
if (executorsPending.get() == 0) {
// No need to use an executor here or a context switch
// there are no actions pending.. hence we can just execute the task directly on the same thread
executeNow = true;
} else {
execute(completion);
}
} else {
if (storeOnly) {
assert !storeOnlyTasks.isEmpty() ? storeOnlyTasks.peekLast().storeLined <= storeLined : true;
storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, storeLined));
} else {
// ensure total ordering
assert validateTasksAdd(storeLined, replicationLined, pageLined);
tasks.add(new TaskHolder(completion, storeLined, replicationLined, pageLined));
}
}
}
}
if (executeNow) {
// Executing outside of any locks
// Executing outside of any locks
if (errorCode != -1) {
completion.onError(errorCode, errorMessage);
} else if (executeNow) {
completion.done();
}

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Test;
@ -100,6 +101,70 @@ public class OperationContextUnitTest extends ActiveMQTestBase {
}
}
@Test
public void testErrorNotLostOnPageSyncError() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
ExecutorService pageSyncTimer = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
class PageWriteErrorJob implements Runnable {
final OperationContextImpl operationContext;
PageWriteErrorJob(OperationContextImpl impl) {
impl.pageSyncLineUp();
operationContext = impl;
}
@Override
public void run() {
try {
operationContext.onError(10, "bla");
} finally {
operationContext.pageSyncDone();
}
}
}
try {
final int numJobs = 10000;
final CountDownLatch errorsOnLateRegister = new CountDownLatch(numJobs);
for (int i = 0; i < numJobs; i++) {
final OperationContextImpl impl = new OperationContextImpl(executor);
final CountDownLatch done = new CountDownLatch(1);
pageSyncTimer.execute(new PageWriteErrorJob(impl));
impl.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
errorsOnLateRegister.countDown();
done.countDown();
}
@Override
public void done() {
done.countDown();
}
});
done.await();
}
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return errorsOnLateRegister.await(1, TimeUnit.SECONDS);
}
}));
} finally {
executor.shutdown();
pageSyncTimer.shutdown();
}
}
@Test
public void testCaptureExceptionOnExecutor() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());