diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java index 6dab3a5cb04..b569f3c772b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java @@ -22,18 +22,24 @@ import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -42,18 +48,23 @@ import java.util.stream.Stream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2, numClientNodes = 1, transportClientRatio = 0.0D) +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1, transportClientRatio = 0.0D) public class WriteMemoryLimitsIT extends ESIntegTestCase { + // TODO: Add additional REST tests when metrics are exposed + public static final String INDEX_NAME = "test"; + private static final Settings unboundedWriteQueue = Settings.builder().put("thread_pool.write.queue_size", -1).build(); + @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) // Need at least two threads because we are going to block one - .put("thread_pool.write.size", 2) + .put(unboundedWriteQueue) .build(); } @@ -78,28 +89,13 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))); ensureGreen(INDEX_NAME); - IndicesStatsResponse response = client().admin().indices().prepareStats(INDEX_NAME).get(); - String primaryId = Stream.of(response.getShards()) - .map(ShardStats::getShardRouting) - .filter(ShardRouting::primary) - .findAny() - .get() - .currentNodeId(); - String replicaId = Stream.of(response.getShards()) - .map(ShardStats::getShardRouting) - .filter(sr -> sr.primary() == false) - .findAny() - .get() - .currentNodeId(); - DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes(); - String primaryName = nodes.get(primaryId).getName(); - String replicaName = nodes.get(replicaId).getName(); - String coordinatingOnlyNode = nodes.getCoordinatingOnlyNodes().iterator().next().value.getName(); + Tuple primaryReplicaNodeNames = getPrimaryReplicaNodeNames(); + String primaryName = primaryReplicaNodeNames.v1(); + String replicaName = primaryReplicaNodeNames.v2(); + String coordinatingOnlyNode = getCoordinatingOnlyNode(); final CountDownLatch replicationSendPointReached = new CountDownLatch(1); final CountDownLatch latchBlockingReplicationSend = new CountDownLatch(1); - final CountDownLatch newActionsSendPointReached = new CountDownLatch(2); - final CountDownLatch latchBlockingReplication = new CountDownLatch(1); TransportService primaryService = internalCluster().getInstance(TransportService.class, primaryName); final MockTransportService primaryTransportService = (MockTransportService) primaryService; @@ -118,6 +114,9 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { connection.sendRequest(requestId, action, request, options); }); + final ThreadPool replicaThreadPool = replicaTransportService.getThreadPool(); + final Releasable replicaRelease = blockReplicas(replicaThreadPool); + final BulkRequest bulkRequest = new BulkRequest(); int totalRequestSize = 0; for (int i = 0; i < 80; ++i) { @@ -146,25 +145,6 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); - ThreadPool replicaThreadPool = replicaTransportService.getThreadPool(); - // Block the replica Write thread pool - replicaThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> { - try { - newActionsSendPointReached.countDown(); - latchBlockingReplication.await(); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - }); - replicaThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> { - try { - newActionsSendPointReached.countDown(); - latchBlockingReplication.await(); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - }); - newActionsSendPointReached.await(); latchBlockingReplicationSend.countDown(); IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) @@ -195,7 +175,7 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { assertBusy(() -> assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkShardRequestSize))); - latchBlockingReplication.countDown(); + replicaRelease.close(); successFuture.actionGet(); secondFuture.actionGet(); @@ -210,16 +190,224 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { if (replicationSendPointReached.getCount() > 0) { replicationSendPointReached.countDown(); } - while (newActionsSendPointReached.getCount() > 0) { - newActionsSendPointReached.countDown(); - } + replicaRelease.close(); if (latchBlockingReplicationSend.getCount() > 0) { latchBlockingReplicationSend.countDown(); } - if (latchBlockingReplication.getCount() > 0) { - latchBlockingReplication.countDown(); - } + replicaRelease.close(); primaryTransportService.clearAllRules(); } } + + public void testWriteCanBeRejectedAtCoordinatingLevel() throws Exception { + final BulkRequest bulkRequest = new BulkRequest(); + int totalRequestSize = 0; + for (int i = 0; i < 80; ++i) { + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) + .source(Collections.singletonMap("key", randomAlphaOfLength(50))); + totalRequestSize += request.ramBytesUsed(); + assertTrue(request.ramBytesUsed() > request.source().length()); + bulkRequest.add(request); + } + + final long bulkRequestSize = bulkRequest.ramBytesUsed(); + final long bulkShardRequestSize = totalRequestSize; + restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(), + (long)(bulkShardRequestSize * 1.5) + "B").build()); + + assertAcked(prepareCreate(INDEX_NAME, Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))); + ensureGreen(INDEX_NAME); + + Tuple primaryReplicaNodeNames = getPrimaryReplicaNodeNames(); + String primaryName = primaryReplicaNodeNames.v1(); + String replicaName = primaryReplicaNodeNames.v2(); + String coordinatingOnlyNode = getCoordinatingOnlyNode(); + + final ThreadPool replicaThreadPool = internalCluster().getInstance(ThreadPool.class, replicaName); + try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) { + final ActionFuture successFuture = client(coordinatingOnlyNode).bulk(bulkRequest); + + WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName); + WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName); + WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode); + + assertBusy(() -> { + assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); + assertEquals(0, replicaWriteLimits.getWriteBytes()); + assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); + assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + }); + + expectThrows(EsRejectedExecutionException.class, () -> { + if (randomBoolean()) { + client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); + } else if (randomBoolean()) { + client(primaryName).bulk(bulkRequest).actionGet(); + } else { + client(replicaName).bulk(bulkRequest).actionGet(); + } + }); + + replicaRelease.close(); + + successFuture.actionGet(); + + assertEquals(0, primaryWriteLimits.getWriteBytes()); + assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); + assertEquals(0, replicaWriteLimits.getWriteBytes()); + assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); + assertEquals(0, coordinatingWriteLimits.getWriteBytes()); + assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + } + } + + public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception { + final BulkRequest bulkRequest = new BulkRequest(); + int totalRequestSize = 0; + for (int i = 0; i < 80; ++i) { + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) + .source(Collections.singletonMap("key", randomAlphaOfLength(50))); + totalRequestSize += request.ramBytesUsed(); + assertTrue(request.ramBytesUsed() > request.source().length()); + bulkRequest.add(request); + } + final long bulkShardRequestSize = totalRequestSize; + restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(), + (long)(bulkShardRequestSize * 1.5) + "B").build()); + + assertAcked(prepareCreate(INDEX_NAME, Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))); + ensureGreen(INDEX_NAME); + + Tuple primaryReplicaNodeNames = getPrimaryReplicaNodeNames(); + String primaryName = primaryReplicaNodeNames.v1(); + String replicaName = primaryReplicaNodeNames.v2(); + String coordinatingOnlyNode = getCoordinatingOnlyNode(); + + final ThreadPool replicaThreadPool = internalCluster().getInstance(ThreadPool.class, replicaName); + try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) { + final ActionFuture successFuture = client(primaryName).bulk(bulkRequest); + + WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName); + WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName); + WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode); + + assertBusy(() -> { + assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); + assertEquals(0, replicaWriteLimits.getWriteBytes()); + assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, coordinatingWriteLimits.getWriteBytes()); + assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + }); + + BulkResponse responses = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); + assertTrue(responses.hasFailures()); + assertThat(responses.getItems()[0].getFailure().getCause().getCause(), instanceOf(EsRejectedExecutionException.class)); + + replicaRelease.close(); + + successFuture.actionGet(); + + assertEquals(0, primaryWriteLimits.getWriteBytes()); + assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); + assertEquals(0, replicaWriteLimits.getWriteBytes()); + assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); + assertEquals(0, coordinatingWriteLimits.getWriteBytes()); + assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + } + } + + public void testWritesWillSucceedIfBelowThreshold() throws Exception { + restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(), "1MB").build()); + assertAcked(prepareCreate(INDEX_NAME, Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))); + ensureGreen(INDEX_NAME); + + Tuple primaryReplicaNodeNames = getPrimaryReplicaNodeNames(); + String replicaName = primaryReplicaNodeNames.v2(); + String coordinatingOnlyNode = getCoordinatingOnlyNode(); + + final ThreadPool replicaThreadPool = internalCluster().getInstance(ThreadPool.class, replicaName); + try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) { + // The write limits is set to 1MB. We will send up to 800KB to stay below that threshold. + int thresholdToStopSending = 800 * 1024; + + ArrayList> responses = new ArrayList<>(); + int totalRequestSize = 0; + while (totalRequestSize < thresholdToStopSending) { + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) + .source(Collections.singletonMap("key", randomAlphaOfLength(500))); + totalRequestSize += request.ramBytesUsed(); + responses.add(client(coordinatingOnlyNode).index(request)); + } + + replicaRelease.close(); + + // Would throw exception if one of the operations was rejected + responses.forEach(ActionFuture::actionGet); + } + } + + private void restartNodesWithSettings(Settings settings) throws Exception { + internalCluster().fullRestart(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) { + return Settings.builder().put(unboundedWriteQueue).put(settings).build(); + } + }); + } + + private String getCoordinatingOnlyNode() { + return client().admin().cluster().prepareState().get().getState().nodes().getCoordinatingOnlyNodes().iterator().next() + .value.getName(); + } + + private Tuple getPrimaryReplicaNodeNames() { + IndicesStatsResponse response = client().admin().indices().prepareStats(INDEX_NAME).get(); + String primaryId = Stream.of(response.getShards()) + .map(ShardStats::getShardRouting) + .filter(ShardRouting::primary) + .findAny() + .get() + .currentNodeId(); + String replicaId = Stream.of(response.getShards()) + .map(ShardStats::getShardRouting) + .filter(sr -> sr.primary() == false) + .findAny() + .get() + .currentNodeId(); + DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes(); + String primaryName = nodes.get(primaryId).getName(); + String replicaName = nodes.get(replicaId).getName(); + return new Tuple<>(primaryName, replicaName); + } + + private Releasable blockReplicas(ThreadPool threadPool) { + final CountDownLatch blockReplication = new CountDownLatch(1); + final int threads = threadPool.info(ThreadPool.Names.WRITE).getMax(); + final CountDownLatch pointReached = new CountDownLatch(threads); + for (int i = 0; i< threads; ++i) { + threadPool.executor(ThreadPool.Names.WRITE).execute(() -> { + try { + pointReached.countDown(); + blockReplication.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + }); + } + + return () -> { + if (blockReplication.getCount() > 0) { + blockReplication.countDown(); + } + }; + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java b/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java index 29371f07950..129d72abdcf 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java @@ -20,26 +20,70 @@ package org.elasticsearch.action.bulk; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import java.util.concurrent.atomic.AtomicLong; public class WriteMemoryLimits { + public static final Setting MAX_INDEXING_BYTES = + Setting.memorySizeSetting("indexing_limits.memory.limit", "10%", Setting.Property.NodeScope); + private final AtomicLong writeBytes = new AtomicLong(0); private final AtomicLong replicaWriteBytes = new AtomicLong(0); + private final long writeLimits; + + public WriteMemoryLimits(Settings settings) { + this.writeLimits = MAX_INDEXING_BYTES.get(settings).getBytes(); + } + + public WriteMemoryLimits(Settings settings, ClusterSettings clusterSettings) { + this.writeLimits = MAX_INDEXING_BYTES.get(settings).getBytes(); + } public Releasable markWriteOperationStarted(long bytes) { - writeBytes.addAndGet(bytes); - return () -> writeBytes.getAndAdd(-bytes); + return markWriteOperationStarted(bytes, false); + } + + public Releasable markWriteOperationStarted(long bytes, boolean forceExecution) { + long currentWriteLimits = this.writeLimits; + long writeBytes = this.writeBytes.addAndGet(bytes); + long replicaWriteBytes = this.replicaWriteBytes.get(); + long totalBytes = writeBytes + replicaWriteBytes; + if (forceExecution == false && totalBytes > currentWriteLimits) { + long bytesWithoutOperation = writeBytes - bytes; + long totalBytesWithoutOperation = totalBytes - bytes; + this.writeBytes.getAndAdd(-bytes); + throw new EsRejectedExecutionException("rejected execution of write operation [" + + "write_bytes=" + bytesWithoutOperation + ", " + + "replica_write_bytes=" + replicaWriteBytes + ", " + + "total_write_bytes=" + totalBytesWithoutOperation + ", " + + "current_operation_bytes=" + bytes + ", " + + "max_write_bytes=" + currentWriteLimits + "]", false); + } + return () -> this.writeBytes.getAndAdd(-bytes); } public long getWriteBytes() { return writeBytes.get(); } - public Releasable markReplicaWriteStarted(long bytes) { - replicaWriteBytes.getAndAdd(bytes); - return () -> replicaWriteBytes.getAndAdd(-bytes); + public Releasable markReplicaWriteStarted(long bytes, boolean forceExecution) { + long currentReplicaWriteLimits = (long) (this.writeLimits * 1.5); + long replicaWriteBytes = this.replicaWriteBytes.getAndAdd(bytes); + if (forceExecution == false && replicaWriteBytes > currentReplicaWriteLimits) { + long replicaBytesWithoutOperation = replicaWriteBytes - bytes; + this.replicaWriteBytes.getAndAdd(-bytes); + throw new EsRejectedExecutionException("rejected execution of replica write operation [" + + "replica_write_bytes=" + replicaBytesWithoutOperation + ", " + + "current_replica_operation_bytes=" + bytes + ", " + + "max_replica_write_bytes=" + currentReplicaWriteLimits + "]", false); + } + return () -> this.replicaWriteBytes.getAndAdd(-bytes); } public long getReplicaWriteBytes() { diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index d63abe205a4..d3280a1adc9 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -805,6 +805,6 @@ public class IndexRequest extends ReplicatedWriteRequest implement @Override public long ramBytesUsed() { - return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id) + (source == null ? 0 : source.ramBytesUsed()); + return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id) + (source == null ? 0 : source.length()); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 5211a446620..c9e335a3611 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -60,7 +60,7 @@ public abstract class TransportWriteAction< Response extends ReplicationResponse & WriteResponse > extends TransportReplicationAction { - private final boolean forceExecutionOnPrimary; + private final boolean forceExecution; private final WriteMemoryLimits writeMemoryLimits; private final String executor; @@ -74,13 +74,13 @@ public abstract class TransportWriteAction< super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary); this.executor = executor; - this.forceExecutionOnPrimary = forceExecutionOnPrimary; + this.forceExecution = forceExecutionOnPrimary; this.writeMemoryLimits = writeMemoryLimits; } @Override protected Releasable checkOperationLimits(Request request) { - return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); + return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request), forceExecution); } @Override @@ -90,7 +90,7 @@ public abstract class TransportWriteAction< if (rerouteWasLocal) { return () -> {}; } else { - return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); + return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request), forceExecution); } } @@ -100,7 +100,7 @@ public abstract class TransportWriteAction< @Override protected Releasable checkReplicaLimits(ReplicaRequest request) { - return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request)); + return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request), forceExecution); } protected long replicaOperationSize(ReplicaRequest request) { @@ -156,7 +156,7 @@ public abstract class TransportWriteAction< @Override public boolean isForceExecution() { - return forceExecutionOnPrimary; + return forceExecution; } }); } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 4a204f607bd..844fba08ad1 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -21,6 +21,7 @@ package org.elasticsearch.common.settings; import org.apache.logging.log4j.LogManager; import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.DestructiveOperations; @@ -554,7 +555,8 @@ public final class ClusterSettings extends AbstractScopedSettings { DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING, FsHealthService.ENABLED_SETTING, FsHealthService.REFRESH_INTERVAL_SETTING, - FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING))); + FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING, + WriteMemoryLimits.MAX_INDEXING_BYTES))); public static List> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList( SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER, diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index e23fdc32031..53f517ddfec 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -595,7 +595,7 @@ public class Node implements Closeable { new PersistentTasksClusterService(settings, registry, clusterService, threadPool); resourcesToClose.add(persistentTasksClusterService); final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); - final WriteMemoryLimits bulkIndexingLimits = new WriteMemoryLimits(); + final WriteMemoryLimits bulkIndexingLimits = new WriteMemoryLimits(settings); modules.add(b -> { b.bind(Node.class).toInstance(this); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index 1272963a0f7..d4cb93eac6e 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -120,7 +121,7 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa final ExecutorService direct = EsExecutors.newDirectExecutorService(); when(threadPool.executor(anyString())).thenReturn(direct); TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService, - null, null, null, mock(ActionFilters.class), null, null, new WriteMemoryLimits()) { + null, null, null, mock(ActionFilters.class), null, null, new WriteMemoryLimits(Settings.EMPTY)) { @Override void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, AtomicArray responses, Map indicesThatCannotBeCreated) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index b78340e2410..540e66603c5 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -142,7 +142,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { new AutoCreateIndex( SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new IndexNameExpressionResolver() - ), new WriteMemoryLimits() + ), new WriteMemoryLimits(SETTINGS) ); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 3aec4b3b471..06e5786a6eb 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -81,7 +81,7 @@ public class TransportBulkActionTests extends ESTestCase { super(TransportBulkActionTests.this.threadPool, transportService, clusterService, null, null, null, new ActionFilters(Collections.emptySet()), new Resolver(), new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver()), - new WriteMemoryLimits()); + new WriteMemoryLimits(Settings.EMPTY)); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index 6c3e8518cf2..b249867060b 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -240,7 +240,7 @@ public class TransportBulkActionTookTests extends ESTestCase { actionFilters, indexNameExpressionResolver, autoCreateIndex, - new WriteMemoryLimits(), + new WriteMemoryLimits(Settings.EMPTY), relativeTimeProvider); } diff --git a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java index 5e57eb6045f..8859651feed 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java @@ -145,7 +145,7 @@ public class TransportResyncReplicationActionTests extends ESTestCase { final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService, clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()), - new WriteMemoryLimits()); + new WriteMemoryLimits(Settings.EMPTY)); assertThat(action.globalBlockLevel(), nullValue()); assertThat(action.indexBlockLevel(), nullValue()); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index c1ef04b10d4..6535ab6d68f 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -367,7 +367,7 @@ public class TransportWriteActionTests extends ESTestCase { new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null, new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false, - new WriteMemoryLimits()); + new WriteMemoryLimits(Settings.EMPTY)); this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary; this.withDocumentFailureOnReplica = withDocumentFailureOnReplica; } @@ -377,7 +377,7 @@ public class TransportWriteActionTests extends ESTestCase { super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, shardStateAction, new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false, - new WriteMemoryLimits()); + new WriteMemoryLimits(settings)); this.withDocumentFailureOnPrimary = false; this.withDocumentFailureOnReplica = false; } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index 6c77999191f..a037f79957c 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -105,7 +105,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase { threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), - new WriteMemoryLimits()); + new WriteMemoryLimits(Settings.EMPTY)); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); action.dispatchedShardOperationOnPrimary(request, indexShard, @@ -142,7 +142,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase { threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), - new WriteMemoryLimits()); + new WriteMemoryLimits(Settings.EMPTY)); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); @@ -182,7 +182,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase { threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), - new WriteMemoryLimits()); + new WriteMemoryLimits(Settings.EMPTY)); assertNull(action.indexBlockLevel()); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 6f1815286de..e570c35f1ce 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1483,7 +1483,7 @@ public class SnapshotResiliencyTests extends ESTestCase { threadPool, shardStateAction, actionFilters, - new WriteMemoryLimits())), + new WriteMemoryLimits(settings))), new GlobalCheckpointSyncAction( settings, transportService, @@ -1509,7 +1509,7 @@ public class SnapshotResiliencyTests extends ESTestCase { mappingUpdatedAction.setClient(client); final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService, clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService), - actionFilters, new WriteMemoryLimits()); + actionFilters, new WriteMemoryLimits(settings)); actions.put(BulkAction.INSTANCE, new TransportBulkAction(threadPool, transportService, clusterService, new IngestService( @@ -1517,7 +1517,7 @@ public class SnapshotResiliencyTests extends ESTestCase { new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(), Collections.emptyList(), client), transportShardBulkAction, client, actionFilters, indexNameExpressionResolver, - new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver), new WriteMemoryLimits() + new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver), new WriteMemoryLimits(settings) )); final RestoreService restoreService = new RestoreService( clusterService, repositoriesService, allocationService,