diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java index 37ef1c06d1..626f467750 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java @@ -130,63 +130,62 @@ public class OperationContextImpl implements OperationContext { @Override public void executeOnCompletion(final IOCallback completion, final boolean storeOnly) { - if (errorCode != -1) { - completion.onError(errorCode, errorMessage); - return; - } - boolean executeNow = false; synchronized (this) { - final int UNDEFINED = Integer.MIN_VALUE; - int storeLined = UNDEFINED; - int pageLined = UNDEFINED; - int replicationLined = UNDEFINED; - if (storeOnly) { - if (storeOnlyTasks == null) { - storeOnlyTasks = new LinkedList<>(); - } - } else { - if (tasks == null) { - tasks = new LinkedList<>(); - minimalReplicated = (replicationLined = replicationLineUp.intValue()); - minimalStore = (storeLined = storeLineUp.intValue()); - minimalPage = (pageLined = pageLineUp.intValue()); - } - } - //On the next branches each of them is been used - if (replicationLined == UNDEFINED) { - replicationLined = replicationLineUp.intValue(); - storeLined = storeLineUp.intValue(); - pageLined = pageLineUp.intValue(); - } - // On this case, we can just execute the context directly - - if (replicationLined == replicated && storeLined == stored && pageLined == paged) { - // We want to avoid the executor if everything is complete... - // However, we can't execute the context if there are executions pending - // We need to use the executor on this case - if (executorsPending.get() == 0) { - // No need to use an executor here or a context switch - // there are no actions pending.. hence we can just execute the task directly on the same thread - executeNow = true; - } else { - execute(completion); - } - } else { + if (errorCode == -1) { + final int UNDEFINED = Integer.MIN_VALUE; + int storeLined = UNDEFINED; + int pageLined = UNDEFINED; + int replicationLined = UNDEFINED; if (storeOnly) { - assert !storeOnlyTasks.isEmpty() ? storeOnlyTasks.peekLast().storeLined <= storeLined : true; - storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, storeLined)); + if (storeOnlyTasks == null) { + storeOnlyTasks = new LinkedList<>(); + } } else { - // ensure total ordering - assert validateTasksAdd(storeLined, replicationLined, pageLined); - tasks.add(new TaskHolder(completion, storeLined, replicationLined, pageLined)); + if (tasks == null) { + tasks = new LinkedList<>(); + minimalReplicated = (replicationLined = replicationLineUp.intValue()); + minimalStore = (storeLined = storeLineUp.intValue()); + minimalPage = (pageLined = pageLineUp.intValue()); + } + } + //On the next branches each of them is been used + if (replicationLined == UNDEFINED) { + replicationLined = replicationLineUp.intValue(); + storeLined = storeLineUp.intValue(); + pageLined = pageLineUp.intValue(); + } + // On this case, we can just execute the context directly + + if (replicationLined == replicated && storeLined == stored && pageLined == paged) { + // We want to avoid the executor if everything is complete... + // However, we can't execute the context if there are executions pending + // We need to use the executor on this case + if (executorsPending.get() == 0) { + // No need to use an executor here or a context switch + // there are no actions pending.. hence we can just execute the task directly on the same thread + executeNow = true; + } else { + execute(completion); + } + } else { + if (storeOnly) { + assert !storeOnlyTasks.isEmpty() ? storeOnlyTasks.peekLast().storeLined <= storeLined : true; + storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, storeLined)); + } else { + // ensure total ordering + assert validateTasksAdd(storeLined, replicationLined, pageLined); + tasks.add(new TaskHolder(completion, storeLined, replicationLined, pageLined)); + } } } } - if (executeNow) { - // Executing outside of any locks + // Executing outside of any locks + if (errorCode != -1) { + completion.onError(errorCode, errorMessage); + } else if (executeNow) { completion.done(); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/OperationContextUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/OperationContextUnitTest.java index 83c6bcdcec..94d6fa5c0d 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/OperationContextUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/OperationContextUnitTest.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.activemq.artemis.utils.Wait; import org.junit.Assert; import org.junit.Test; @@ -100,6 +101,70 @@ public class OperationContextUnitTest extends ActiveMQTestBase { } } + @Test + public void testErrorNotLostOnPageSyncError() throws Exception { + + ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()); + ExecutorService pageSyncTimer = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()); + + class PageWriteErrorJob implements Runnable { + final OperationContextImpl operationContext; + PageWriteErrorJob(OperationContextImpl impl) { + impl.pageSyncLineUp(); + operationContext = impl; + } + + @Override + public void run() { + try { + operationContext.onError(10, "bla"); + } finally { + operationContext.pageSyncDone(); + } + } + } + + try { + final int numJobs = 10000; + final CountDownLatch errorsOnLateRegister = new CountDownLatch(numJobs); + + for (int i = 0; i < numJobs; i++) { + final OperationContextImpl impl = new OperationContextImpl(executor); + + final CountDownLatch done = new CountDownLatch(1); + + pageSyncTimer.execute(new PageWriteErrorJob(impl)); + impl.executeOnCompletion(new IOCallback() { + + @Override + public void onError(int errorCode, String errorMessage) { + errorsOnLateRegister.countDown(); + done.countDown(); + } + + @Override + public void done() { + done.countDown(); + } + }); + + done.await(); + } + + assertTrue(Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return errorsOnLateRegister.await(1, TimeUnit.SECONDS); + } + })); + + + } finally { + executor.shutdown(); + pageSyncTimer.shutdown(); + } + } + @Test public void testCaptureExceptionOnExecutor() throws Exception { ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());