Ensure IndexingPressure memory is re-adjusted once (#67673)

We have seen a case where the memory of IndexingPressure was
re-adjusted twice. With this commit, we will log that error with a
stack trace so that we can figure out the source of the issue.
This commit is contained in:
Nhat Nguyen 2021-01-19 12:48:37 -05:00
parent 7279f28344
commit 34b7497c0c
2 changed files with 24 additions and 8 deletions

View File

@ -19,6 +19,8 @@
package org.elasticsearch.index; package org.elasticsearch.index;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -26,6 +28,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.stats.IndexingPressureStats; import org.elasticsearch.index.stats.IndexingPressureStats;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
public class IndexingPressure { public class IndexingPressure {
@ -33,6 +36,8 @@ public class IndexingPressure {
public static final Setting<ByteSizeValue> MAX_INDEXING_BYTES = public static final Setting<ByteSizeValue> MAX_INDEXING_BYTES =
Setting.memorySizeSetting("indexing_pressure.memory.limit", "10%", Setting.Property.NodeScope); Setting.memorySizeSetting("indexing_pressure.memory.limit", "10%", Setting.Property.NodeScope);
private static final Logger logger = LogManager.getLogger(IndexingPressure.class);
private final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0); private final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
private final AtomicLong currentCoordinatingBytes = new AtomicLong(0); private final AtomicLong currentCoordinatingBytes = new AtomicLong(0);
private final AtomicLong currentPrimaryBytes = new AtomicLong(0); private final AtomicLong currentPrimaryBytes = new AtomicLong(0);
@ -55,6 +60,19 @@ public class IndexingPressure {
this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5); this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5);
} }
private static Releasable wrapReleasable(Releasable releasable) {
final AtomicBoolean called = new AtomicBoolean();
return () -> {
if (called.compareAndSet(false, true)) {
releasable.close();
} else {
logger.error("IndexingPressure memory is adjusted twice", new IllegalStateException("Releasable is called twice"));
assert false : "IndexingPressure is adjusted twice";
}
};
}
public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExecution) { public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExecution) {
long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes); long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
long replicaWriteBytes = this.currentReplicaBytes.get(); long replicaWriteBytes = this.currentReplicaBytes.get();
@ -74,16 +92,16 @@ public class IndexingPressure {
currentCoordinatingBytes.getAndAdd(bytes); currentCoordinatingBytes.getAndAdd(bytes);
totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes); totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
totalCoordinatingBytes.getAndAdd(bytes); totalCoordinatingBytes.getAndAdd(bytes);
return () -> { return wrapReleasable(() -> {
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes); this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
this.currentCoordinatingBytes.getAndAdd(-bytes); this.currentCoordinatingBytes.getAndAdd(-bytes);
}; });
} }
public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(long bytes) { public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(long bytes) {
currentPrimaryBytes.getAndAdd(bytes); currentPrimaryBytes.getAndAdd(bytes);
totalPrimaryBytes.getAndAdd(bytes); totalPrimaryBytes.getAndAdd(bytes);
return () -> this.currentPrimaryBytes.getAndAdd(-bytes); return wrapReleasable(() -> this.currentPrimaryBytes.getAndAdd(-bytes));
} }
public Releasable markPrimaryOperationStarted(long bytes, boolean forceExecution) { public Releasable markPrimaryOperationStarted(long bytes, boolean forceExecution) {
@ -105,10 +123,10 @@ public class IndexingPressure {
currentPrimaryBytes.getAndAdd(bytes); currentPrimaryBytes.getAndAdd(bytes);
totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes); totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
totalPrimaryBytes.getAndAdd(bytes); totalPrimaryBytes.getAndAdd(bytes);
return () -> { return wrapReleasable(() -> {
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes); this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
this.currentPrimaryBytes.getAndAdd(-bytes); this.currentPrimaryBytes.getAndAdd(-bytes);
}; });
} }
public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution) { public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution) {
@ -123,7 +141,7 @@ public class IndexingPressure {
"max_replica_bytes=" + replicaLimits + "]", false); "max_replica_bytes=" + replicaLimits + "]", false);
} }
totalReplicaBytes.getAndAdd(bytes); totalReplicaBytes.getAndAdd(bytes);
return () -> this.currentReplicaBytes.getAndAdd(-bytes); return wrapReleasable(() -> this.currentReplicaBytes.getAndAdd(-bytes));
} }
public long getCurrentCombinedCoordinatingAndPrimaryBytes() { public long getCurrentCombinedCoordinatingAndPrimaryBytes() {

View File

@ -126,9 +126,7 @@ public class IndexingPressureTests extends ESTestCase {
assertEquals(1, indexingPressure.stats().getReplicaRejections()); assertEquals(1, indexingPressure.stats().getReplicaRejections());
assertEquals(1024 * 14, indexingPressure.stats().getCurrentReplicaBytes()); assertEquals(1024 * 14, indexingPressure.stats().getCurrentReplicaBytes());
forced.close(); forced.close();
replica2.close(); replica2.close();
forced.close();
} }
assertEquals(1024 * 14, indexingPressure.stats().getTotalReplicaBytes()); assertEquals(1024 * 14, indexingPressure.stats().getTotalReplicaBytes());