ARTEMIS-2949 Reduce GC on OperationContext::checkTasks

This commit is contained in:
franz1981 2020-10-14 10:35:40 +02:00 committed by Clebert Suconic
parent d9545540e2
commit 6932b4674d
1 changed files with 89 additions and 32 deletions

View File

@ -16,9 +16,7 @@
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -70,8 +68,8 @@ public class OperationContextImpl implements OperationContext {
OperationContextImpl.threadLocalContext.set(context);
}
private List<TaskHolder> tasks;
private List<TaskHolder> storeOnlyTasks;
private LinkedList<TaskHolder> tasks;
private LinkedList<StoreOnlyTaskHolder> storeOnlyTasks;
private long minimalStore = Long.MAX_VALUE;
private long minimalReplicated = Long.MAX_VALUE;
@ -177,8 +175,11 @@ public class OperationContextImpl implements OperationContext {
}
} else {
if (storeOnly) {
storeOnlyTasks.add(new TaskHolder(completion, storeLined, replicationLined, pageLined));
assert !storeOnlyTasks.isEmpty() ? storeOnlyTasks.peekLast().storeLined <= storeLined : true;
storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, storeLined));
} else {
// ensure total ordering
assert validateTasksAdd(storeLined, replicationLined, pageLined);
tasks.add(new TaskHolder(completion, storeLined, replicationLined, pageLined));
}
}
@ -191,41 +192,75 @@ public class OperationContextImpl implements OperationContext {
}
private boolean validateTasksAdd(int storeLined, int replicationLined, int pageLined) {
if (tasks.isEmpty()) {
return true;
}
final TaskHolder holder = tasks.peekLast();
if (holder.storeLined > storeLined ||
holder.replicationLined > replicationLined ||
holder.pageLined > pageLined) {
return false;
}
return true;
}
@Override
public synchronized void done() {
stored++;
checkTasks();
}
private void checkStoreTasks() {
final LinkedList<StoreOnlyTaskHolder> storeOnlyTasks = this.storeOnlyTasks;
assert storeOnlyTasks != null;
final int size = storeOnlyTasks.size();
if (size == 0) {
return;
}
final long stored = this.stored;
for (int i = 0; i < size; i++) {
final StoreOnlyTaskHolder holder = storeOnlyTasks.peek();
if (holder.storeLined < stored) {
// fail fast: storeOnlyTasks are ordered by storeLined, there is no need to continue
return;
}
// If set, we use an executor to avoid the server being single threaded
execute(holder.task);
final StoreOnlyTaskHolder removed = storeOnlyTasks.poll();
assert removed == holder;
}
}
private void checkCompleteContext() {
final LinkedList<TaskHolder> tasks = this.tasks;
assert tasks != null;
final int size = this.tasks.size();
if (size == 0) {
return;
}
assert size >= 1;
// no need to use an iterator here, we can save that cost
for (int i = 0; i < size; i++) {
final TaskHolder holder = tasks.peek();
if (stored < holder.storeLined || replicated < holder.replicationLined || paged < holder.pageLined) {
// End of list here. No other task will be completed after this
return;
}
execute(holder.task);
final TaskHolder removed = tasks.poll();
assert removed == holder;
}
}
private void checkTasks() {
if (storeOnlyTasks != null) {
Iterator<TaskHolder> iter = storeOnlyTasks.iterator();
while (iter.hasNext()) {
TaskHolder holder = iter.next();
if (stored >= holder.storeLined) {
// If set, we use an executor to avoid the server being single threaded
execute(holder.task);
iter.remove();
}
}
checkStoreTasks();
}
if (stored >= minimalStore && replicated >= minimalReplicated && paged >= minimalPage) {
Iterator<TaskHolder> iter = tasks.iterator();
while (iter.hasNext()) {
TaskHolder holder = iter.next();
if (stored >= holder.storeLined && replicated >= holder.replicationLined && paged >= holder.pageLined) {
// If set, we use an executor to avoid the server being single threaded
execute(holder.task);
iter.remove();
} else {
// End of list here. No other task will be completed after this
break;
}
}
checkCompleteContext();
}
}
@ -267,11 +302,11 @@ public class OperationContextImpl implements OperationContext {
this.errorMessage = errorMessage;
if (tasks != null) {
Iterator<TaskHolder> iter = tasks.iterator();
while (iter.hasNext()) {
TaskHolder holder = iter.next();
// it's saving the Iterator allocation cost
final int size = tasks.size();
for (int i = 0; i < size; i++) {
final TaskHolder holder = tasks.poll();
holder.task.onError(errorCode, errorMessage);
iter.remove();
}
}
}
@ -304,6 +339,28 @@ public class OperationContextImpl implements OperationContext {
}
}
/**
* This class has been created to both better capture the intention that the {@link IOCallback} is related to a
* store-only operation and to reduce the memory footprint for store-only cases, given that many fields of
* {@link TaskHolder} are not necessary for this to work. Inheritance proved to not as effective especially without
* COOPS and with a 64 bit JVM so we've used different classes.
*/
static final class StoreOnlyTaskHolder {
@Override
public String toString() {
return "StoreOnlyTaskHolder [storeLined=" + storeLined + ", task=" + task + "]";
}
final int storeLined;
final IOCallback task;
StoreOnlyTaskHolder(final IOCallback task, int storeLined) {
this.storeLined = storeLined;
this.task = task;
}
}
@Override
public void waitCompletion() throws Exception {
waitCompletion(0);