ARTEMIS-3519 OperationContext not respecting store lineups and dones on store only

This commit is contained in:
Clebert Suconic 2021-10-06 03:19:09 -04:00 committed by clebertsuconic
parent baf89d947d
commit ef9011a83c
2 changed files with 203 additions and 59 deletions

View File

@ -18,8 +18,8 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.util.LinkedList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -68,20 +68,35 @@ public class OperationContextImpl implements OperationContext {
OperationContextImpl.threadLocalContext.set(context);
}
private LinkedList<TaskHolder> tasks;
private LinkedList<StoreOnlyTaskHolder> storeOnlyTasks;
LinkedList<TaskHolder> tasks;
LinkedList<StoreOnlyTaskHolder> storeOnlyTasks;
private long minimalStore = Long.MAX_VALUE;
private long minimalReplicated = Long.MAX_VALUE;
private long minimalPage = Long.MAX_VALUE;
long minimalStore = Long.MAX_VALUE;
long minimalReplicated = Long.MAX_VALUE;
long minimalPage = Long.MAX_VALUE;
private final AtomicLong storeLineUp = new AtomicLong(0);
private final AtomicLong replicationLineUp = new AtomicLong(0);
private final AtomicLong pageLineUp = new AtomicLong(0);
private long stored = 0;
private long replicated = 0;
private long paged = 0;
static final AtomicIntegerFieldUpdater<OperationContextImpl> EXECUTORS_PENDING_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(OperationContextImpl.class, "executorsPendingField");
static final AtomicLongFieldUpdater<OperationContextImpl> STORE_LINEUP_UPDATER = AtomicLongFieldUpdater
.newUpdater(OperationContextImpl.class, "storeLineUpField");
static final AtomicLongFieldUpdater<OperationContextImpl> REPLICATION_LINEUP_UPDATER = AtomicLongFieldUpdater
.newUpdater(OperationContextImpl.class, "replicationLineUpField");
static final AtomicLongFieldUpdater<OperationContextImpl> PAGE_LINEUP_UPDATER = AtomicLongFieldUpdater
.newUpdater(OperationContextImpl.class, "pageLineUpField");
volatile int executorsPendingField = 0;
volatile long storeLineUpField = 0;
volatile long replicationLineUpField = 0;
volatile long pageLineUpField = 0;
long stored = 0;
long replicated = 0;
long paged = 0;
private int errorCode = -1;
@ -89,8 +104,6 @@ public class OperationContextImpl implements OperationContext {
private final Executor executor;
private final AtomicInteger executorsPending = new AtomicInteger(0);
public OperationContextImpl(final Executor executor) {
super();
this.executor = executor;
@ -98,7 +111,7 @@ public class OperationContextImpl implements OperationContext {
@Override
public void pageSyncLineUp() {
pageLineUp.incrementAndGet();
PAGE_LINEUP_UPDATER.incrementAndGet(this);
}
@Override
@ -109,12 +122,12 @@ public class OperationContextImpl implements OperationContext {
@Override
public void storeLineUp() {
storeLineUp.incrementAndGet();
STORE_LINEUP_UPDATER.incrementAndGet(this);
}
@Override
public void replicationLineUp() {
replicationLineUp.incrementAndGet();
REPLICATION_LINEUP_UPDATER.incrementAndGet(this);
}
@Override
@ -134,10 +147,9 @@ public class OperationContextImpl implements OperationContext {
synchronized (this) {
if (errorCode == -1) {
final int UNDEFINED = Integer.MIN_VALUE;
int storeLined = UNDEFINED;
int pageLined = UNDEFINED;
int replicationLined = UNDEFINED;
final long storeLined = STORE_LINEUP_UPDATER.get(this);
final long pageLined = PAGE_LINEUP_UPDATER.get(this);
final long replicationLined = REPLICATION_LINEUP_UPDATER.get(this);
if (storeOnly) {
if (storeOnlyTasks == null) {
storeOnlyTasks = new LinkedList<>();
@ -145,24 +157,18 @@ public class OperationContextImpl implements OperationContext {
} else {
if (tasks == null) {
tasks = new LinkedList<>();
minimalReplicated = (replicationLined = replicationLineUp.intValue());
minimalStore = (storeLined = storeLineUp.intValue());
minimalPage = (pageLined = pageLineUp.intValue());
minimalReplicated = replicationLined;
minimalStore = storeLined;
minimalPage = pageLined;
}
}
//On the next branches each of them is been used
if (replicationLined == UNDEFINED) {
replicationLined = replicationLineUp.intValue();
storeLined = storeLineUp.intValue();
pageLined = pageLineUp.intValue();
}
// On this case, we can just execute the context directly
if (replicationLined == replicated && storeLined == stored && pageLined == paged) {
// We want to avoid the executor if everything is complete...
// However, we can't execute the context if there are executions pending
// We need to use the executor on this case
if (executorsPending.get() == 0) {
if (EXECUTORS_PENDING_UPDATER.get(this) == 0) {
// No need to use an executor here or a context switch
// there are no actions pending.. hence we can just execute the task directly on the same thread
executeNow = true;
@ -191,7 +197,7 @@ public class OperationContextImpl implements OperationContext {
}
private boolean validateTasksAdd(int storeLined, int replicationLined, int pageLined) {
private boolean validateTasksAdd(long storeLined, long replicationLined, long pageLined) {
if (tasks.isEmpty()) {
return true;
}
@ -220,7 +226,7 @@ public class OperationContextImpl implements OperationContext {
final long stored = this.stored;
for (int i = 0; i < size; i++) {
final StoreOnlyTaskHolder holder = storeOnlyTasks.peek();
if (holder.storeLined < stored) {
if (stored < holder.storeLined) {
// fail fast: storeOnlyTasks are ordered by storeLined, there is no need to continue
return;
}
@ -267,7 +273,7 @@ public class OperationContextImpl implements OperationContext {
* @param task
*/
private void execute(final IOCallback task) {
executorsPending.incrementAndGet();
EXECUTORS_PENDING_UPDATER.incrementAndGet(this);
try {
executor.execute(new Runnable() {
@Override
@ -277,13 +283,13 @@ public class OperationContextImpl implements OperationContext {
OperationContextImpl.clearContext();
task.done();
} finally {
executorsPending.decrementAndGet();
EXECUTORS_PENDING_UPDATER.decrementAndGet(OperationContextImpl.this);
}
}
});
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.errorExecutingAIOCallback(e);
executorsPending.decrementAndGet();
EXECUTORS_PENDING_UPDATER.decrementAndGet(this);
task.onError(ActiveMQExceptionType.INTERNAL_ERROR.getCode(), "It wasn't possible to complete IO operation - " + e.getMessage());
}
}
@ -324,13 +330,13 @@ public class OperationContextImpl implements OperationContext {
"]";
}
final int storeLined;
final int replicationLined;
final int pageLined;
long storeLined;
long replicationLined;
long pageLined;
final IOCallback task;
TaskHolder(final IOCallback task, int storeLined, int replicationLined, int pageLined) {
TaskHolder(final IOCallback task, long storeLined, long replicationLined, long pageLined) {
this.storeLined = storeLined;
this.replicationLined = replicationLined;
this.pageLined = pageLined;
@ -351,10 +357,10 @@ public class OperationContextImpl implements OperationContext {
return "StoreOnlyTaskHolder [storeLined=" + storeLined + ", task=" + task + "]";
}
final int storeLined;
long storeLined;
final IOCallback task;
StoreOnlyTaskHolder(final IOCallback task, int storeLined) {
StoreOnlyTaskHolder(final IOCallback task, long storeLined) {
this.storeLined = storeLined;
this.task = task;
}
@ -389,13 +395,13 @@ public class OperationContextImpl implements OperationContext {
return "OperationContextImpl [" + hashCode() + "] [minimalStore=" + minimalStore +
", storeLineUp=" +
storeLineUp +
storeLineUpField +
", stored=" +
stored +
", minimalReplicated=" +
minimalReplicated +
", replicationLineUp=" +
replicationLineUp +
replicationLineUpField +
", replicated=" +
replicated +
", paged=" +
@ -403,13 +409,13 @@ public class OperationContextImpl implements OperationContext {
", minimalPage=" +
minimalPage +
", pageLineUp=" +
pageLineUp +
pageLineUpField +
", errorCode=" +
errorCode +
", errorMessage=" +
errorMessage +
", executorsPending=" +
executorsPending +
executorsPendingField +
", executor=" + this.executor +
"]" + buffer.toString();
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.unit.core.persistence.impl;
package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Wait;
@ -33,16 +32,6 @@ import org.junit.Test;
public class OperationContextUnitTest extends ActiveMQTestBase {
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
@Test
public void testCompleteTaskAfterPaging() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
@ -101,6 +90,155 @@ public class OperationContextUnitTest extends ActiveMQTestBase {
}
}
@Test
public void testCompleteTaskStoreOnly() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
try {
OperationContextImpl impl = new OperationContextImpl(executor);
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
final CountDownLatch latch3 = new CountDownLatch(1);
impl.storeLineUp();
impl.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
}
@Override
public void done() {
latch1.countDown();
}
}, true);
impl.storeLineUp();
impl.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
}
@Override
public void done() {
latch3.countDown();
}
}, true);
impl.done();
assertTrue(latch1.await(10, TimeUnit.SECONDS));
assertFalse(latch3.await(1, TimeUnit.MILLISECONDS));
impl.done();
assertTrue(latch3.await(10, TimeUnit.SECONDS));
for (int i = 0; i < 10; i++)
impl.storeLineUp();
for (int i = 0; i < 3; i++)
impl.pageSyncLineUp();
impl.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
}
@Override
public void done() {
latch2.countDown();
}
}, true);
assertFalse(latch2.await(1, TimeUnit.MILLISECONDS));
for (int i = 0; i < 9; i++)
impl.done();
assertFalse(latch2.await(1, TimeUnit.MILLISECONDS));
impl.done();
assertTrue(latch2.await(10, TimeUnit.SECONDS));
} finally {
executor.shutdown();
}
}
@Test
public void testCompletionLateStoreOnly() throws Exception {
testCompletionLate(true);
}
@Test
public void testCompletionLate() throws Exception {
testCompletionLate(false);
}
private void testCompletionLate(boolean storeOnly) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
try {
OperationContextImpl impl = new OperationContextImpl(executor);
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
if (storeOnly) {
// if storeOnly, then the pageSyncLinup and replication lineup should not bother the results
impl.pageSyncLineUp();
impl.replicationLineUp();
}
impl.storeLineUp();
impl.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
}
@Override
public void done() {
latch1.countDown();
}
}, storeOnly);
impl.storeLineUpField = 350000;
impl.stored = impl.storeLineUpField - 1;
if (impl.tasks != null) {
impl.tasks.forEach((t) -> t.storeLined = 150000L);
}
if (impl.storeOnlyTasks != null) {
impl.storeOnlyTasks.forEach((t) -> t.storeLined = 150000L);
}
impl.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
}
@Override
public void done() {
latch2.countDown();
}
}, storeOnly);
impl.done();
assertTrue(latch1.await(10, TimeUnit.SECONDS));
assertTrue(latch2.await(10, TimeUnit.SECONDS));
} finally {
executor.shutdown();
}
}
@Test
public void testErrorNotLostOnPageSyncError() throws Exception {