Implement rejections in `WriteMemoryLimits` (#59451)
This commit adds rejections when the indexing memory limits are exceeded for primary or coordinating operations. The amount of bytes allow for indexing is controlled by a new setting `indexing_limits.memory.limit`.
This commit is contained in:
parent
a1cf955dbd
commit
68d56fa7db
|
@ -22,18 +22,24 @@ import org.elasticsearch.action.ActionFuture;
|
|||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.InternalSettingsPlugin;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -42,18 +48,23 @@ import java.util.stream.Stream;
|
|||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2, numClientNodes = 1, transportClientRatio = 0.0D)
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1, transportClientRatio = 0.0D)
|
||||
public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
||||
|
||||
// TODO: Add additional REST tests when metrics are exposed
|
||||
|
||||
public static final String INDEX_NAME = "test";
|
||||
|
||||
private static final Settings unboundedWriteQueue = Settings.builder().put("thread_pool.write.queue_size", -1).build();
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
// Need at least two threads because we are going to block one
|
||||
.put("thread_pool.write.size", 2)
|
||||
.put(unboundedWriteQueue)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
@ -78,28 +89,13 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)));
|
||||
ensureGreen(INDEX_NAME);
|
||||
|
||||
IndicesStatsResponse response = client().admin().indices().prepareStats(INDEX_NAME).get();
|
||||
String primaryId = Stream.of(response.getShards())
|
||||
.map(ShardStats::getShardRouting)
|
||||
.filter(ShardRouting::primary)
|
||||
.findAny()
|
||||
.get()
|
||||
.currentNodeId();
|
||||
String replicaId = Stream.of(response.getShards())
|
||||
.map(ShardStats::getShardRouting)
|
||||
.filter(sr -> sr.primary() == false)
|
||||
.findAny()
|
||||
.get()
|
||||
.currentNodeId();
|
||||
DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes();
|
||||
String primaryName = nodes.get(primaryId).getName();
|
||||
String replicaName = nodes.get(replicaId).getName();
|
||||
String coordinatingOnlyNode = nodes.getCoordinatingOnlyNodes().iterator().next().value.getName();
|
||||
Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames();
|
||||
String primaryName = primaryReplicaNodeNames.v1();
|
||||
String replicaName = primaryReplicaNodeNames.v2();
|
||||
String coordinatingOnlyNode = getCoordinatingOnlyNode();
|
||||
|
||||
final CountDownLatch replicationSendPointReached = new CountDownLatch(1);
|
||||
final CountDownLatch latchBlockingReplicationSend = new CountDownLatch(1);
|
||||
final CountDownLatch newActionsSendPointReached = new CountDownLatch(2);
|
||||
final CountDownLatch latchBlockingReplication = new CountDownLatch(1);
|
||||
|
||||
TransportService primaryService = internalCluster().getInstance(TransportService.class, primaryName);
|
||||
final MockTransportService primaryTransportService = (MockTransportService) primaryService;
|
||||
|
@ -118,6 +114,9 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
connection.sendRequest(requestId, action, request, options);
|
||||
});
|
||||
|
||||
final ThreadPool replicaThreadPool = replicaTransportService.getThreadPool();
|
||||
final Releasable replicaRelease = blockReplicas(replicaThreadPool);
|
||||
|
||||
final BulkRequest bulkRequest = new BulkRequest();
|
||||
int totalRequestSize = 0;
|
||||
for (int i = 0; i < 80; ++i) {
|
||||
|
@ -146,25 +145,6 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
|
||||
|
||||
ThreadPool replicaThreadPool = replicaTransportService.getThreadPool();
|
||||
// Block the replica Write thread pool
|
||||
replicaThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {
|
||||
try {
|
||||
newActionsSendPointReached.countDown();
|
||||
latchBlockingReplication.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
});
|
||||
replicaThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {
|
||||
try {
|
||||
newActionsSendPointReached.countDown();
|
||||
latchBlockingReplication.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
});
|
||||
newActionsSendPointReached.await();
|
||||
latchBlockingReplicationSend.countDown();
|
||||
|
||||
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
|
||||
|
@ -195,7 +175,7 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
assertBusy(() -> assertThat(replicaWriteLimits.getReplicaWriteBytes(),
|
||||
greaterThan(bulkShardRequestSize + secondBulkShardRequestSize)));
|
||||
|
||||
latchBlockingReplication.countDown();
|
||||
replicaRelease.close();
|
||||
|
||||
successFuture.actionGet();
|
||||
secondFuture.actionGet();
|
||||
|
@ -210,16 +190,224 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
if (replicationSendPointReached.getCount() > 0) {
|
||||
replicationSendPointReached.countDown();
|
||||
}
|
||||
while (newActionsSendPointReached.getCount() > 0) {
|
||||
newActionsSendPointReached.countDown();
|
||||
}
|
||||
replicaRelease.close();
|
||||
if (latchBlockingReplicationSend.getCount() > 0) {
|
||||
latchBlockingReplicationSend.countDown();
|
||||
}
|
||||
if (latchBlockingReplication.getCount() > 0) {
|
||||
latchBlockingReplication.countDown();
|
||||
}
|
||||
replicaRelease.close();
|
||||
primaryTransportService.clearAllRules();
|
||||
}
|
||||
}
|
||||
|
||||
public void testWriteCanBeRejectedAtCoordinatingLevel() throws Exception {
|
||||
final BulkRequest bulkRequest = new BulkRequest();
|
||||
int totalRequestSize = 0;
|
||||
for (int i = 0; i < 80; ++i) {
|
||||
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
|
||||
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
|
||||
totalRequestSize += request.ramBytesUsed();
|
||||
assertTrue(request.ramBytesUsed() > request.source().length());
|
||||
bulkRequest.add(request);
|
||||
}
|
||||
|
||||
final long bulkRequestSize = bulkRequest.ramBytesUsed();
|
||||
final long bulkShardRequestSize = totalRequestSize;
|
||||
restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(),
|
||||
(long)(bulkShardRequestSize * 1.5) + "B").build());
|
||||
|
||||
assertAcked(prepareCreate(INDEX_NAME, Settings.builder()
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)));
|
||||
ensureGreen(INDEX_NAME);
|
||||
|
||||
Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames();
|
||||
String primaryName = primaryReplicaNodeNames.v1();
|
||||
String replicaName = primaryReplicaNodeNames.v2();
|
||||
String coordinatingOnlyNode = getCoordinatingOnlyNode();
|
||||
|
||||
final ThreadPool replicaThreadPool = internalCluster().getInstance(ThreadPool.class, replicaName);
|
||||
try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) {
|
||||
final ActionFuture<BulkResponse> successFuture = client(coordinatingOnlyNode).bulk(bulkRequest);
|
||||
|
||||
WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName);
|
||||
WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName);
|
||||
WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode);
|
||||
|
||||
assertBusy(() -> {
|
||||
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getWriteBytes());
|
||||
assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
|
||||
});
|
||||
|
||||
expectThrows(EsRejectedExecutionException.class, () -> {
|
||||
if (randomBoolean()) {
|
||||
client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
|
||||
} else if (randomBoolean()) {
|
||||
client(primaryName).bulk(bulkRequest).actionGet();
|
||||
} else {
|
||||
client(replicaName).bulk(bulkRequest).actionGet();
|
||||
}
|
||||
});
|
||||
|
||||
replicaRelease.close();
|
||||
|
||||
successFuture.actionGet();
|
||||
|
||||
assertEquals(0, primaryWriteLimits.getWriteBytes());
|
||||
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
|
||||
}
|
||||
}
|
||||
|
||||
public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception {
|
||||
final BulkRequest bulkRequest = new BulkRequest();
|
||||
int totalRequestSize = 0;
|
||||
for (int i = 0; i < 80; ++i) {
|
||||
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
|
||||
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
|
||||
totalRequestSize += request.ramBytesUsed();
|
||||
assertTrue(request.ramBytesUsed() > request.source().length());
|
||||
bulkRequest.add(request);
|
||||
}
|
||||
final long bulkShardRequestSize = totalRequestSize;
|
||||
restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(),
|
||||
(long)(bulkShardRequestSize * 1.5) + "B").build());
|
||||
|
||||
assertAcked(prepareCreate(INDEX_NAME, Settings.builder()
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)));
|
||||
ensureGreen(INDEX_NAME);
|
||||
|
||||
Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames();
|
||||
String primaryName = primaryReplicaNodeNames.v1();
|
||||
String replicaName = primaryReplicaNodeNames.v2();
|
||||
String coordinatingOnlyNode = getCoordinatingOnlyNode();
|
||||
|
||||
final ThreadPool replicaThreadPool = internalCluster().getInstance(ThreadPool.class, replicaName);
|
||||
try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) {
|
||||
final ActionFuture<BulkResponse> successFuture = client(primaryName).bulk(bulkRequest);
|
||||
|
||||
WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName);
|
||||
WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName);
|
||||
WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode);
|
||||
|
||||
assertBusy(() -> {
|
||||
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getWriteBytes());
|
||||
assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(0, coordinatingWriteLimits.getWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
|
||||
});
|
||||
|
||||
BulkResponse responses = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
|
||||
assertTrue(responses.hasFailures());
|
||||
assertThat(responses.getItems()[0].getFailure().getCause().getCause(), instanceOf(EsRejectedExecutionException.class));
|
||||
|
||||
replicaRelease.close();
|
||||
|
||||
successFuture.actionGet();
|
||||
|
||||
assertEquals(0, primaryWriteLimits.getWriteBytes());
|
||||
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
|
||||
}
|
||||
}
|
||||
|
||||
public void testWritesWillSucceedIfBelowThreshold() throws Exception {
|
||||
restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(), "1MB").build());
|
||||
assertAcked(prepareCreate(INDEX_NAME, Settings.builder()
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)));
|
||||
ensureGreen(INDEX_NAME);
|
||||
|
||||
Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames();
|
||||
String replicaName = primaryReplicaNodeNames.v2();
|
||||
String coordinatingOnlyNode = getCoordinatingOnlyNode();
|
||||
|
||||
final ThreadPool replicaThreadPool = internalCluster().getInstance(ThreadPool.class, replicaName);
|
||||
try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) {
|
||||
// The write limits is set to 1MB. We will send up to 800KB to stay below that threshold.
|
||||
int thresholdToStopSending = 800 * 1024;
|
||||
|
||||
ArrayList<ActionFuture<IndexResponse>> responses = new ArrayList<>();
|
||||
int totalRequestSize = 0;
|
||||
while (totalRequestSize < thresholdToStopSending) {
|
||||
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
|
||||
.source(Collections.singletonMap("key", randomAlphaOfLength(500)));
|
||||
totalRequestSize += request.ramBytesUsed();
|
||||
responses.add(client(coordinatingOnlyNode).index(request));
|
||||
}
|
||||
|
||||
replicaRelease.close();
|
||||
|
||||
// Would throw exception if one of the operations was rejected
|
||||
responses.forEach(ActionFuture::actionGet);
|
||||
}
|
||||
}
|
||||
|
||||
private void restartNodesWithSettings(Settings settings) throws Exception {
|
||||
internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
|
||||
@Override
|
||||
public Settings onNodeStopped(String nodeName) {
|
||||
return Settings.builder().put(unboundedWriteQueue).put(settings).build();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private String getCoordinatingOnlyNode() {
|
||||
return client().admin().cluster().prepareState().get().getState().nodes().getCoordinatingOnlyNodes().iterator().next()
|
||||
.value.getName();
|
||||
}
|
||||
|
||||
private Tuple<String, String> getPrimaryReplicaNodeNames() {
|
||||
IndicesStatsResponse response = client().admin().indices().prepareStats(INDEX_NAME).get();
|
||||
String primaryId = Stream.of(response.getShards())
|
||||
.map(ShardStats::getShardRouting)
|
||||
.filter(ShardRouting::primary)
|
||||
.findAny()
|
||||
.get()
|
||||
.currentNodeId();
|
||||
String replicaId = Stream.of(response.getShards())
|
||||
.map(ShardStats::getShardRouting)
|
||||
.filter(sr -> sr.primary() == false)
|
||||
.findAny()
|
||||
.get()
|
||||
.currentNodeId();
|
||||
DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes();
|
||||
String primaryName = nodes.get(primaryId).getName();
|
||||
String replicaName = nodes.get(replicaId).getName();
|
||||
return new Tuple<>(primaryName, replicaName);
|
||||
}
|
||||
|
||||
private Releasable blockReplicas(ThreadPool threadPool) {
|
||||
final CountDownLatch blockReplication = new CountDownLatch(1);
|
||||
final int threads = threadPool.info(ThreadPool.Names.WRITE).getMax();
|
||||
final CountDownLatch pointReached = new CountDownLatch(threads);
|
||||
for (int i = 0; i< threads; ++i) {
|
||||
threadPool.executor(ThreadPool.Names.WRITE).execute(() -> {
|
||||
try {
|
||||
pointReached.countDown();
|
||||
blockReplication.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return () -> {
|
||||
if (blockReplication.getCount() > 0) {
|
||||
blockReplication.countDown();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,26 +20,70 @@
|
|||
package org.elasticsearch.action.bulk;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class WriteMemoryLimits {
|
||||
|
||||
public static final Setting<ByteSizeValue> MAX_INDEXING_BYTES =
|
||||
Setting.memorySizeSetting("indexing_limits.memory.limit", "10%", Setting.Property.NodeScope);
|
||||
|
||||
private final AtomicLong writeBytes = new AtomicLong(0);
|
||||
private final AtomicLong replicaWriteBytes = new AtomicLong(0);
|
||||
private final long writeLimits;
|
||||
|
||||
public WriteMemoryLimits(Settings settings) {
|
||||
this.writeLimits = MAX_INDEXING_BYTES.get(settings).getBytes();
|
||||
}
|
||||
|
||||
public WriteMemoryLimits(Settings settings, ClusterSettings clusterSettings) {
|
||||
this.writeLimits = MAX_INDEXING_BYTES.get(settings).getBytes();
|
||||
}
|
||||
|
||||
public Releasable markWriteOperationStarted(long bytes) {
|
||||
writeBytes.addAndGet(bytes);
|
||||
return () -> writeBytes.getAndAdd(-bytes);
|
||||
return markWriteOperationStarted(bytes, false);
|
||||
}
|
||||
|
||||
public Releasable markWriteOperationStarted(long bytes, boolean forceExecution) {
|
||||
long currentWriteLimits = this.writeLimits;
|
||||
long writeBytes = this.writeBytes.addAndGet(bytes);
|
||||
long replicaWriteBytes = this.replicaWriteBytes.get();
|
||||
long totalBytes = writeBytes + replicaWriteBytes;
|
||||
if (forceExecution == false && totalBytes > currentWriteLimits) {
|
||||
long bytesWithoutOperation = writeBytes - bytes;
|
||||
long totalBytesWithoutOperation = totalBytes - bytes;
|
||||
this.writeBytes.getAndAdd(-bytes);
|
||||
throw new EsRejectedExecutionException("rejected execution of write operation [" +
|
||||
"write_bytes=" + bytesWithoutOperation + ", " +
|
||||
"replica_write_bytes=" + replicaWriteBytes + ", " +
|
||||
"total_write_bytes=" + totalBytesWithoutOperation + ", " +
|
||||
"current_operation_bytes=" + bytes + ", " +
|
||||
"max_write_bytes=" + currentWriteLimits + "]", false);
|
||||
}
|
||||
return () -> this.writeBytes.getAndAdd(-bytes);
|
||||
}
|
||||
|
||||
public long getWriteBytes() {
|
||||
return writeBytes.get();
|
||||
}
|
||||
|
||||
public Releasable markReplicaWriteStarted(long bytes) {
|
||||
replicaWriteBytes.getAndAdd(bytes);
|
||||
return () -> replicaWriteBytes.getAndAdd(-bytes);
|
||||
public Releasable markReplicaWriteStarted(long bytes, boolean forceExecution) {
|
||||
long currentReplicaWriteLimits = (long) (this.writeLimits * 1.5);
|
||||
long replicaWriteBytes = this.replicaWriteBytes.getAndAdd(bytes);
|
||||
if (forceExecution == false && replicaWriteBytes > currentReplicaWriteLimits) {
|
||||
long replicaBytesWithoutOperation = replicaWriteBytes - bytes;
|
||||
this.replicaWriteBytes.getAndAdd(-bytes);
|
||||
throw new EsRejectedExecutionException("rejected execution of replica write operation [" +
|
||||
"replica_write_bytes=" + replicaBytesWithoutOperation + ", " +
|
||||
"current_replica_operation_bytes=" + bytes + ", " +
|
||||
"max_replica_write_bytes=" + currentReplicaWriteLimits + "]", false);
|
||||
}
|
||||
return () -> this.replicaWriteBytes.getAndAdd(-bytes);
|
||||
}
|
||||
|
||||
public long getReplicaWriteBytes() {
|
||||
|
|
|
@ -805,6 +805,6 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
|
|||
|
||||
@Override
|
||||
public long ramBytesUsed() {
|
||||
return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id) + (source == null ? 0 : source.ramBytesUsed());
|
||||
return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id) + (source == null ? 0 : source.length());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ public abstract class TransportWriteAction<
|
|||
Response extends ReplicationResponse & WriteResponse
|
||||
> extends TransportReplicationAction<Request, ReplicaRequest, Response> {
|
||||
|
||||
private final boolean forceExecutionOnPrimary;
|
||||
private final boolean forceExecution;
|
||||
private final WriteMemoryLimits writeMemoryLimits;
|
||||
private final String executor;
|
||||
|
||||
|
@ -74,13 +74,13 @@ public abstract class TransportWriteAction<
|
|||
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary);
|
||||
this.executor = executor;
|
||||
this.forceExecutionOnPrimary = forceExecutionOnPrimary;
|
||||
this.forceExecution = forceExecutionOnPrimary;
|
||||
this.writeMemoryLimits = writeMemoryLimits;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Releasable checkOperationLimits(Request request) {
|
||||
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request));
|
||||
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request), forceExecution);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -90,7 +90,7 @@ public abstract class TransportWriteAction<
|
|||
if (rerouteWasLocal) {
|
||||
return () -> {};
|
||||
} else {
|
||||
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request));
|
||||
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request), forceExecution);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -100,7 +100,7 @@ public abstract class TransportWriteAction<
|
|||
|
||||
@Override
|
||||
protected Releasable checkReplicaLimits(ReplicaRequest request) {
|
||||
return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request));
|
||||
return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request), forceExecution);
|
||||
}
|
||||
|
||||
protected long replicaOperationSize(ReplicaRequest request) {
|
||||
|
@ -156,7 +156,7 @@ public abstract class TransportWriteAction<
|
|||
|
||||
@Override
|
||||
public boolean isForceExecution() {
|
||||
return forceExecutionOnPrimary;
|
||||
return forceExecution;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.common.settings;
|
|||
import org.apache.logging.log4j.LogManager;
|
||||
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
|
||||
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
|
||||
import org.elasticsearch.action.bulk.WriteMemoryLimits;
|
||||
import org.elasticsearch.action.search.TransportSearchAction;
|
||||
import org.elasticsearch.action.support.AutoCreateIndex;
|
||||
import org.elasticsearch.action.support.DestructiveOperations;
|
||||
|
@ -554,7 +555,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING,
|
||||
FsHealthService.ENABLED_SETTING,
|
||||
FsHealthService.REFRESH_INTERVAL_SETTING,
|
||||
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING)));
|
||||
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
|
||||
WriteMemoryLimits.MAX_INDEXING_BYTES)));
|
||||
|
||||
public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(
|
||||
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER,
|
||||
|
|
|
@ -595,7 +595,7 @@ public class Node implements Closeable {
|
|||
new PersistentTasksClusterService(settings, registry, clusterService, threadPool);
|
||||
resourcesToClose.add(persistentTasksClusterService);
|
||||
final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
|
||||
final WriteMemoryLimits bulkIndexingLimits = new WriteMemoryLimits();
|
||||
final WriteMemoryLimits bulkIndexingLimits = new WriteMemoryLimits(settings);
|
||||
|
||||
modules.add(b -> {
|
||||
b.bind(Node.class).toInstance(this);
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.Metadata;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
|
@ -120,7 +121,7 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa
|
|||
final ExecutorService direct = EsExecutors.newDirectExecutorService();
|
||||
when(threadPool.executor(anyString())).thenReturn(direct);
|
||||
TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService,
|
||||
null, null, null, mock(ActionFilters.class), null, null, new WriteMemoryLimits()) {
|
||||
null, null, null, mock(ActionFilters.class), null, null, new WriteMemoryLimits(Settings.EMPTY)) {
|
||||
@Override
|
||||
void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,
|
||||
AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
|
||||
|
|
|
@ -142,7 +142,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
|||
new AutoCreateIndex(
|
||||
SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
|
||||
new IndexNameExpressionResolver()
|
||||
), new WriteMemoryLimits()
|
||||
), new WriteMemoryLimits(SETTINGS)
|
||||
);
|
||||
}
|
||||
@Override
|
||||
|
|
|
@ -81,7 +81,7 @@ public class TransportBulkActionTests extends ESTestCase {
|
|||
super(TransportBulkActionTests.this.threadPool, transportService, clusterService, null, null,
|
||||
null, new ActionFilters(Collections.emptySet()), new Resolver(),
|
||||
new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver()),
|
||||
new WriteMemoryLimits());
|
||||
new WriteMemoryLimits(Settings.EMPTY));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -240,7 +240,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
|
|||
actionFilters,
|
||||
indexNameExpressionResolver,
|
||||
autoCreateIndex,
|
||||
new WriteMemoryLimits(),
|
||||
new WriteMemoryLimits(Settings.EMPTY),
|
||||
relativeTimeProvider);
|
||||
}
|
||||
|
||||
|
|
|
@ -145,7 +145,7 @@ public class TransportResyncReplicationActionTests extends ESTestCase {
|
|||
|
||||
final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService,
|
||||
clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()),
|
||||
new WriteMemoryLimits());
|
||||
new WriteMemoryLimits(Settings.EMPTY));
|
||||
|
||||
assertThat(action.globalBlockLevel(), nullValue());
|
||||
assertThat(action.indexBlockLevel(), nullValue());
|
||||
|
|
|
@ -367,7 +367,7 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
|
||||
x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null,
|
||||
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false,
|
||||
new WriteMemoryLimits());
|
||||
new WriteMemoryLimits(Settings.EMPTY));
|
||||
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
|
||||
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
|
||||
}
|
||||
|
@ -377,7 +377,7 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
super(settings, actionName, transportService, clusterService,
|
||||
mockIndicesService(clusterService), threadPool, shardStateAction,
|
||||
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false,
|
||||
new WriteMemoryLimits());
|
||||
new WriteMemoryLimits(settings));
|
||||
this.withDocumentFailureOnPrimary = false;
|
||||
this.withDocumentFailureOnReplica = false;
|
||||
}
|
||||
|
|
|
@ -105,7 +105,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
|
|||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new WriteMemoryLimits());
|
||||
new WriteMemoryLimits(Settings.EMPTY));
|
||||
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
|
||||
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
|
||||
action.dispatchedShardOperationOnPrimary(request, indexShard,
|
||||
|
@ -142,7 +142,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
|
|||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new WriteMemoryLimits());
|
||||
new WriteMemoryLimits(Settings.EMPTY));
|
||||
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
|
||||
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
|
||||
|
||||
|
@ -182,7 +182,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
|
|||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new WriteMemoryLimits());
|
||||
new WriteMemoryLimits(Settings.EMPTY));
|
||||
|
||||
assertNull(action.indexBlockLevel());
|
||||
}
|
||||
|
|
|
@ -1483,7 +1483,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
threadPool,
|
||||
shardStateAction,
|
||||
actionFilters,
|
||||
new WriteMemoryLimits())),
|
||||
new WriteMemoryLimits(settings))),
|
||||
new GlobalCheckpointSyncAction(
|
||||
settings,
|
||||
transportService,
|
||||
|
@ -1509,7 +1509,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
mappingUpdatedAction.setClient(client);
|
||||
final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService,
|
||||
clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService),
|
||||
actionFilters, new WriteMemoryLimits());
|
||||
actionFilters, new WriteMemoryLimits(settings));
|
||||
actions.put(BulkAction.INSTANCE,
|
||||
new TransportBulkAction(threadPool, transportService, clusterService,
|
||||
new IngestService(
|
||||
|
@ -1517,7 +1517,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(),
|
||||
Collections.emptyList(), client),
|
||||
transportShardBulkAction, client, actionFilters, indexNameExpressionResolver,
|
||||
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver), new WriteMemoryLimits()
|
||||
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver), new WriteMemoryLimits(settings)
|
||||
));
|
||||
final RestoreService restoreService = new RestoreService(
|
||||
clusterService, repositoriesService, allocationService,
|
||||
|
|
Loading…
Reference in New Issue