diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java index 82c381c8ce..37ef1c06d1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java @@ -16,9 +16,7 @@ */ package org.apache.activemq.artemis.core.persistence.impl.journal; -import java.util.Iterator; import java.util.LinkedList; -import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -70,8 +68,8 @@ public class OperationContextImpl implements OperationContext { OperationContextImpl.threadLocalContext.set(context); } - private List tasks; - private List storeOnlyTasks; + private LinkedList tasks; + private LinkedList storeOnlyTasks; private long minimalStore = Long.MAX_VALUE; private long minimalReplicated = Long.MAX_VALUE; @@ -177,8 +175,11 @@ public class OperationContextImpl implements OperationContext { } } else { if (storeOnly) { - storeOnlyTasks.add(new TaskHolder(completion, storeLined, replicationLined, pageLined)); + assert !storeOnlyTasks.isEmpty() ? storeOnlyTasks.peekLast().storeLined <= storeLined : true; + storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, storeLined)); } else { + // ensure total ordering + assert validateTasksAdd(storeLined, replicationLined, pageLined); tasks.add(new TaskHolder(completion, storeLined, replicationLined, pageLined)); } } @@ -191,41 +192,75 @@ public class OperationContextImpl implements OperationContext { } + private boolean validateTasksAdd(int storeLined, int replicationLined, int pageLined) { + if (tasks.isEmpty()) { + return true; + } + final TaskHolder holder = tasks.peekLast(); + if (holder.storeLined > storeLined || + holder.replicationLined > replicationLined || + holder.pageLined > pageLined) { + return false; + } + return true; + } + @Override public synchronized void done() { stored++; checkTasks(); } + private void checkStoreTasks() { + final LinkedList storeOnlyTasks = this.storeOnlyTasks; + assert storeOnlyTasks != null; + final int size = storeOnlyTasks.size(); + if (size == 0) { + return; + } + final long stored = this.stored; + for (int i = 0; i < size; i++) { + final StoreOnlyTaskHolder holder = storeOnlyTasks.peek(); + if (holder.storeLined < stored) { + // fail fast: storeOnlyTasks are ordered by storeLined, there is no need to continue + return; + } + // If set, we use an executor to avoid the server being single threaded + execute(holder.task); + final StoreOnlyTaskHolder removed = storeOnlyTasks.poll(); + assert removed == holder; + } + } + + private void checkCompleteContext() { + final LinkedList tasks = this.tasks; + assert tasks != null; + final int size = this.tasks.size(); + if (size == 0) { + return; + } + assert size >= 1; + // no need to use an iterator here, we can save that cost + for (int i = 0; i < size; i++) { + final TaskHolder holder = tasks.peek(); + if (stored < holder.storeLined || replicated < holder.replicationLined || paged < holder.pageLined) { + // End of list here. No other task will be completed after this + return; + } + execute(holder.task); + final TaskHolder removed = tasks.poll(); + assert removed == holder; + } + } + private void checkTasks() { if (storeOnlyTasks != null) { - Iterator iter = storeOnlyTasks.iterator(); - while (iter.hasNext()) { - TaskHolder holder = iter.next(); - if (stored >= holder.storeLined) { - // If set, we use an executor to avoid the server being single threaded - execute(holder.task); - - iter.remove(); - } - } + checkStoreTasks(); } if (stored >= minimalStore && replicated >= minimalReplicated && paged >= minimalPage) { - Iterator iter = tasks.iterator(); - while (iter.hasNext()) { - TaskHolder holder = iter.next(); - if (stored >= holder.storeLined && replicated >= holder.replicationLined && paged >= holder.pageLined) { - // If set, we use an executor to avoid the server being single threaded - execute(holder.task); - - iter.remove(); - } else { - // End of list here. No other task will be completed after this - break; - } - } + checkCompleteContext(); } } @@ -267,11 +302,11 @@ public class OperationContextImpl implements OperationContext { this.errorMessage = errorMessage; if (tasks != null) { - Iterator iter = tasks.iterator(); - while (iter.hasNext()) { - TaskHolder holder = iter.next(); + // it's saving the Iterator allocation cost + final int size = tasks.size(); + for (int i = 0; i < size; i++) { + final TaskHolder holder = tasks.poll(); holder.task.onError(errorCode, errorMessage); - iter.remove(); } } } @@ -304,6 +339,28 @@ public class OperationContextImpl implements OperationContext { } } + /** + * This class has been created to both better capture the intention that the {@link IOCallback} is related to a + * store-only operation and to reduce the memory footprint for store-only cases, given that many fields of + * {@link TaskHolder} are not necessary for this to work. Inheritance proved to not as effective especially without + * COOPS and with a 64 bit JVM so we've used different classes. + */ + static final class StoreOnlyTaskHolder { + + @Override + public String toString() { + return "StoreOnlyTaskHolder [storeLined=" + storeLined + ", task=" + task + "]"; + } + + final int storeLined; + final IOCallback task; + + StoreOnlyTaskHolder(final IOCallback task, int storeLined) { + this.storeLined = storeLined; + this.task = task; + } + } + @Override public void waitCompletion() throws Exception { waitCompletion(0);