From 623df95a3231cff0e4d50542e16a355f6e2a6211 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 13 Jul 2020 18:23:42 -0500 Subject: [PATCH] Adding indexing pressure stats to node stats API (#59467) We have recently added internal metrics to monitor the amount of indexing occurring on a node. These metrics introduce back pressure to indexing when memory utilization is too high. This commit exposes these stats through the node stats API. --- .../http/IndexingPressureRestIT.java | 129 ++++++++++++++++++ .../rest-api-spec/api/nodes.stats.json | 12 +- .../test/nodes.stats/50_indexing_pressure.yml | 28 ++++ .../IndexingPressureIT.java} | 119 ++++++++-------- .../admin/cluster/node/stats/NodeStats.java | 24 +++- .../cluster/node/stats/NodesStatsRequest.java | 3 +- .../node/stats/TransportNodesStatsAction.java | 3 +- .../stats/TransportClusterStatsAction.java | 2 +- .../action/bulk/TransportBulkAction.java | 13 +- .../action/bulk/TransportShardBulkAction.java | 5 +- .../action/bulk/WriteMemoryLimits.java | 92 ------------- .../TransportResyncReplicationAction.java | 6 +- .../replication/TransportWriteAction.java | 14 +- .../common/settings/ClusterSettings.java | 4 +- .../elasticsearch/index/IndexingPressure.java | 111 +++++++++++++++ .../index/seqno/RetentionLeaseSyncAction.java | 6 +- .../index/stats/IndexingPressureStats.java | 85 ++++++++++++ .../java/org/elasticsearch/node/Node.java | 8 +- .../org/elasticsearch/node/NodeService.java | 11 +- .../cluster/node/stats/NodeStatsTests.java | 2 +- ...ActionIndicesThatCannotBeCreatedTests.java | 3 +- .../bulk/TransportBulkActionIngestTests.java | 3 +- .../action/bulk/TransportBulkActionTests.java | 3 +- .../bulk/TransportBulkActionTookTests.java | 3 +- ...TransportResyncReplicationActionTests.java | 4 +- .../TransportWriteActionTests.java | 6 +- .../elasticsearch/cluster/DiskUsageTests.java | 12 +- .../seqno/RetentionLeaseSyncActionTests.java | 8 +- .../snapshots/SnapshotResiliencyTests.java | 8 +- .../MockInternalClusterInfoService.java | 3 +- .../test/InternalTestCluster.java | 10 +- .../xpack/ccr/LocalIndexFollowingIT.java | 6 +- .../TransportBulkShardOperationsAction.java | 12 +- .../ml/MachineLearningFeatureSetTests.java | 2 +- ...sportGetTrainedModelsStatsActionTests.java | 2 +- .../node/NodeStatsMonitoringDocTests.java | 3 +- 36 files changed, 535 insertions(+), 230 deletions(-) create mode 100644 qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndexingPressureRestIT.java create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/50_indexing_pressure.yml rename server/src/internalClusterTest/java/org/elasticsearch/{action/bulk/WriteMemoryLimitsIT.java => index/IndexingPressureIT.java} (74%) delete mode 100644 server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java create mode 100644 server/src/main/java/org/elasticsearch/index/IndexingPressure.java create mode 100644 server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java diff --git a/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndexingPressureRestIT.java b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndexingPressureRestIT.java new file mode 100644 index 00000000000..b096107e0a3 --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndexingPressureRestIT.java @@ -0,0 +1,129 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.http; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.IndexingPressure; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.XContentTestUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; + +import static org.elasticsearch.rest.RestStatus.CREATED; +import static org.elasticsearch.rest.RestStatus.OK; +import static org.elasticsearch.rest.RestStatus.TOO_MANY_REQUESTS; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; + +/** + * Test Indexing Pressure Metrics and Statistics + */ +@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0) +public class IndexingPressureRestIT extends HttpSmokeTestCase { + + private static final Settings unboundedWriteQueue = Settings.builder().put("thread_pool.write.queue_size", -1).build(); + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1KB") + .put(unboundedWriteQueue) + .build(); + } + + @SuppressWarnings("unchecked") + public void testIndexingPressureStats() throws IOException { + Request createRequest = new Request("PUT", "/index_name"); + createRequest.setJsonEntity("{\"settings\": {\"index\": {\"number_of_shards\": 1, \"number_of_replicas\": 1, " + + "\"write.wait_for_active_shards\": 2}}}"); + final Response indexCreatedResponse = getRestClient().performRequest(createRequest); + assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + + Request successfulIndexingRequest = new Request("POST", "/index_name/_doc/"); + successfulIndexingRequest.setJsonEntity("{\"x\": \"small text\"}"); + final Response indexSuccessFul = getRestClient().performRequest(successfulIndexingRequest); + assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(CREATED.getStatus())); + + Request getNodeStats = new Request("GET", "/_nodes/stats/indexing_pressure"); + final Response nodeStats = getRestClient().performRequest(getNodeStats); + Map nodeStatsMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, nodeStats.getEntity().getContent(), true); + ArrayList values = new ArrayList<>(((Map) nodeStatsMap.get("nodes")).values()); + assertThat(values.size(), equalTo(2)); + XContentTestUtils.JsonMapView node1 = new XContentTestUtils.JsonMapView((Map) values.get(0)); + Integer node1IndexingBytes = node1.get("indexing_pressure.total.coordinating_and_primary_bytes"); + Integer node1ReplicaBytes = node1.get("indexing_pressure.total.replica_bytes"); + Integer node1Rejections = node1.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections"); + XContentTestUtils.JsonMapView node2 = new XContentTestUtils.JsonMapView((Map) values.get(1)); + Integer node2IndexingBytes = node2.get("indexing_pressure.total.coordinating_and_primary_bytes"); + Integer node2ReplicaBytes = node2.get("indexing_pressure.total.replica_bytes"); + Integer node2Rejections = node2.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections"); + + if (node1IndexingBytes == 0) { + assertThat(node2IndexingBytes, greaterThan(0)); + assertThat(node2IndexingBytes, lessThan(1024)); + } else { + assertThat(node1IndexingBytes, greaterThan(0)); + assertThat(node1IndexingBytes, lessThan(1024)); + } + + if (node1ReplicaBytes == 0) { + assertThat(node2ReplicaBytes, greaterThan(0)); + assertThat(node2ReplicaBytes, lessThan(1024)); + } else { + assertThat(node2ReplicaBytes, equalTo(0)); + assertThat(node1ReplicaBytes, lessThan(1024)); + } + + assertThat(node1Rejections, equalTo(0)); + assertThat(node2Rejections, equalTo(0)); + + Request failedIndexingRequest = new Request("POST", "/index_name/_doc/"); + String largeString = randomAlphaOfLength(10000); + failedIndexingRequest.setJsonEntity("{\"x\": " + largeString + "}"); + ResponseException exception = expectThrows(ResponseException.class, () -> getRestClient().performRequest(failedIndexingRequest)); + assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(TOO_MANY_REQUESTS.getStatus())); + + Request getNodeStats2 = new Request("GET", "/_nodes/stats/indexing_pressure"); + final Response nodeStats2 = getRestClient().performRequest(getNodeStats2); + Map nodeStatsMap2 = XContentHelper.convertToMap(JsonXContent.jsonXContent, nodeStats2.getEntity().getContent(), + true); + ArrayList values2 = new ArrayList<>(((Map) nodeStatsMap2.get("nodes")).values()); + assertThat(values2.size(), equalTo(2)); + XContentTestUtils.JsonMapView node1AfterRejection = new XContentTestUtils.JsonMapView((Map) values2.get(0)); + node1Rejections = node1AfterRejection.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections"); + XContentTestUtils.JsonMapView node2AfterRejection = new XContentTestUtils.JsonMapView((Map) values2.get(1)); + node2Rejections = node2AfterRejection.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections"); + + if (node1Rejections == 0) { + assertThat(node2Rejections, equalTo(1)); + } else { + assertThat(node1Rejections, equalTo(1)); + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json b/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json index 1aa57ee849c..cc1a9e81850 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json @@ -44,7 +44,8 @@ "process", "thread_pool", "transport", - "discovery" + "discovery", + "indexing_pressure" ], "description":"Limit the information returned to the specified metrics" } @@ -69,7 +70,8 @@ "process", "thread_pool", "transport", - "discovery" + "discovery", + "indexing_pressure" ], "description":"Limit the information returned to the specified metrics" }, @@ -98,7 +100,8 @@ "process", "thread_pool", "transport", - "discovery" + "discovery", + "indexing_pressure" ], "description":"Limit the information returned to the specified metrics" }, @@ -145,7 +148,8 @@ "process", "thread_pool", "transport", - "discovery" + "discovery", + "indexing_pressure" ], "description":"Limit the information returned to the specified metrics" }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/50_indexing_pressure.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/50_indexing_pressure.yml new file mode 100644 index 00000000000..bf85c9e2ac1 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/50_indexing_pressure.yml @@ -0,0 +1,28 @@ +--- +"Indexing pressure stats": + - skip: + version: " - 7.8.99" + reason: "indexing_pressure was added in 7.9" + features: [arbitrary_key] + + - do: + nodes.info: {} + - set: + nodes._arbitrary_key_: node_id + + - do: + nodes.stats: + metric: [ indexing_pressure ] + + - gte: { nodes.$node_id.indexing_pressure.total.coordinating_and_primary_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.total.replica_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.total.all_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.total.coordinating_and_primary_memory_limit_rejections: 0 } + - gte: { nodes.$node_id.indexing_pressure.total.replica_memory_limit_rejections: 0 } + - gte: { nodes.$node_id.indexing_pressure.current.coordinating_and_primary_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.current.replica_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.current.all_bytes: 0 } + +# TODO: +# +# Change skipped version after backport diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java similarity index 74% rename from server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java rename to server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java index b569f3c772b..dca9c59fedc 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java @@ -16,11 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.action.bulk; +package org.elasticsearch.index; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.bulk.TransportShardBulkAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -51,7 +54,7 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1, transportClientRatio = 0.0D) -public class WriteMemoryLimitsIT extends ESIntegTestCase { +public class IndexingPressureIT extends ESIntegTestCase { // TODO: Add additional REST tests when metrics are exposed @@ -63,7 +66,6 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - // Need at least two threads because we are going to block one .put(unboundedWriteQueue) .build(); } @@ -134,16 +136,16 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { final ActionFuture successFuture = client(coordinatingOnlyNode).bulk(bulkRequest); replicationSendPointReached.await(); - WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName); - WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName); - WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode); + IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName); + IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName); + IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode); - assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(0, replicaWriteLimits.getWriteBytes()); - assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); - assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); + assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); latchBlockingReplicationSend.countDown(); @@ -165,14 +167,15 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { final long secondBulkShardRequestSize = request.ramBytesUsed(); if (usePrimaryAsCoordinatingNode) { - assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkRequestSize)); - assertEquals(0, replicaWriteLimits.getWriteBytes()); + assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), + greaterThan(bulkShardRequestSize + secondBulkRequestSize)); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); } else { - assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(secondBulkRequestSize, replicaWriteLimits.getWriteBytes()); + assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(secondBulkRequestSize, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); } - assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); - assertBusy(() -> assertThat(replicaWriteLimits.getReplicaWriteBytes(), + assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertBusy(() -> assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize + secondBulkShardRequestSize))); replicaRelease.close(); @@ -180,12 +183,12 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { successFuture.actionGet(); secondFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getWriteBytes()); - assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(0, replicaWriteLimits.getWriteBytes()); - assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); } finally { if (replicationSendPointReached.getCount() > 0) { replicationSendPointReached.countDown(); @@ -212,8 +215,8 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { final long bulkRequestSize = bulkRequest.ramBytesUsed(); final long bulkShardRequestSize = totalRequestSize; - restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(), - (long)(bulkShardRequestSize * 1.5) + "B").build()); + restartNodesWithSettings(Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), + (long) (bulkShardRequestSize * 1.5) + "B").build()); assertAcked(prepareCreate(INDEX_NAME, Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) @@ -229,17 +232,17 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) { final ActionFuture successFuture = client(coordinatingOnlyNode).bulk(bulkRequest); - WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName); - WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName); - WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode); + IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName); + IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName); + IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode); assertBusy(() -> { - assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(0, replicaWriteLimits.getWriteBytes()); - assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); }); expectThrows(EsRejectedExecutionException.class, () -> { @@ -256,12 +259,12 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { successFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getWriteBytes()); - assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(0, replicaWriteLimits.getWriteBytes()); - assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); } } @@ -276,7 +279,7 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { bulkRequest.add(request); } final long bulkShardRequestSize = totalRequestSize; - restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(), + restartNodesWithSettings(Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), (long)(bulkShardRequestSize * 1.5) + "B").build()); assertAcked(prepareCreate(INDEX_NAME, Settings.builder() @@ -293,17 +296,17 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) { final ActionFuture successFuture = client(primaryName).bulk(bulkRequest); - WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName); - WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName); - WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode); + IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName); + IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName); + IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode); assertBusy(() -> { - assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(0, replicaWriteLimits.getWriteBytes()); - assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, coordinatingWriteLimits.getWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); }); BulkResponse responses = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); @@ -314,17 +317,17 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { successFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getWriteBytes()); - assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(0, replicaWriteLimits.getWriteBytes()); - assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); } } public void testWritesWillSucceedIfBelowThreshold() throws Exception { - restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(), "1MB").build()); + restartNodesWithSettings(Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1MB").build()); assertAcked(prepareCreate(INDEX_NAME, Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java index 56a177e4966..f5db990f5e3 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.http.HttpStats; +import org.elasticsearch.index.stats.IndexingPressureStats; import org.elasticsearch.indices.NodeIndicesStats; import org.elasticsearch.indices.breaker.AllCircuitBreakerStats; import org.elasticsearch.ingest.IngestStats; @@ -95,6 +96,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private AdaptiveSelectionStats adaptiveSelectionStats; + @Nullable + private IndexingPressureStats indexingPressureStats; + public NodeStats(StreamInput in) throws IOException { super(in); timestamp = in.readVLong(); @@ -125,6 +129,11 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { scriptCacheStats = scriptStats.toScriptCacheStats(); } } + if (in.getVersion().onOrAfter(Version.V_7_9_0)) { + indexingPressureStats = in.readOptionalWriteable(IndexingPressureStats::new); + } else { + indexingPressureStats = null; + } } public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats indices, @@ -135,7 +144,8 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable DiscoveryStats discoveryStats, @Nullable IngestStats ingestStats, @Nullable AdaptiveSelectionStats adaptiveSelectionStats, - @Nullable ScriptCacheStats scriptCacheStats) { + @Nullable ScriptCacheStats scriptCacheStats, + @Nullable IndexingPressureStats indexingPressureStats) { super(node); this.timestamp = timestamp; this.indices = indices; @@ -152,6 +162,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { this.ingestStats = ingestStats; this.adaptiveSelectionStats = adaptiveSelectionStats; this.scriptCacheStats = scriptCacheStats; + this.indexingPressureStats = indexingPressureStats; } public long getTimestamp() { @@ -251,6 +262,11 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { return scriptCacheStats; } + @Nullable + public IndexingPressureStats getIndexingPressureStats() { + return indexingPressureStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -277,6 +293,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { } if (out.getVersion().onOrAfter(Version.V_7_8_0) && out.getVersion().before(Version.V_7_9_0)) { out.writeOptionalWriteable(scriptCacheStats); } + if (out.getVersion().onOrAfter(Version.V_7_9_0)) { + out.writeOptionalWriteable(indexingPressureStats); + } } @Override @@ -343,6 +362,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { if (getScriptCacheStats() != null) { getScriptCacheStats().toXContent(builder, params); } + if (getIndexingPressureStats() != null) { + getIndexingPressureStats().toXContent(builder, params); + } return builder; } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java index b92de67ce6c..44f07dd8243 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -224,7 +224,8 @@ public class NodesStatsRequest extends BaseNodesRequest { DISCOVERY("discovery"), INGEST("ingest"), ADAPTIVE_SELECTION("adaptive_selection"), - SCRIPT_CACHE("script_cache"); + SCRIPT_CACHE("script_cache"), + INDEXING_PRESSURE("indexing_pressure"),; private String metricName; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 6b75dac8aa0..2e868686b77 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -83,7 +83,8 @@ public class TransportNodesStatsAction extends TransportNodesAction shardsStats = new ArrayList<>(); for (IndexService indexService : indicesService) { for (IndexShard indexShard : indexService) { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index fe780c00301..fa95eeba964 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -68,6 +68,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexClosedException; @@ -113,23 +114,23 @@ public class TransportBulkAction extends HandledTransportAction listener) { long indexingBytes = bulkRequest.ramBytesUsed(); - final Releasable releasable = writeMemoryLimits.markWriteOperationStarted(indexingBytes); + final Releasable releasable = indexingPressure.markIndexingOperationStarted(indexingBytes); final ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); try { doInternalExecute(task, bulkRequest, releasingListener); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 53bd949dbe2..0c88af3ca7c 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -55,6 +55,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; @@ -91,9 +92,9 @@ public class TransportShardBulkAction extends TransportWriteAction MAX_INDEXING_BYTES = - Setting.memorySizeSetting("indexing_limits.memory.limit", "10%", Setting.Property.NodeScope); - - private final AtomicLong writeBytes = new AtomicLong(0); - private final AtomicLong replicaWriteBytes = new AtomicLong(0); - private final long writeLimits; - - public WriteMemoryLimits(Settings settings) { - this.writeLimits = MAX_INDEXING_BYTES.get(settings).getBytes(); - } - - public WriteMemoryLimits(Settings settings, ClusterSettings clusterSettings) { - this.writeLimits = MAX_INDEXING_BYTES.get(settings).getBytes(); - } - - public Releasable markWriteOperationStarted(long bytes) { - return markWriteOperationStarted(bytes, false); - } - - public Releasable markWriteOperationStarted(long bytes, boolean forceExecution) { - long currentWriteLimits = this.writeLimits; - long writeBytes = this.writeBytes.addAndGet(bytes); - long replicaWriteBytes = this.replicaWriteBytes.get(); - long totalBytes = writeBytes + replicaWriteBytes; - if (forceExecution == false && totalBytes > currentWriteLimits) { - long bytesWithoutOperation = writeBytes - bytes; - long totalBytesWithoutOperation = totalBytes - bytes; - this.writeBytes.getAndAdd(-bytes); - throw new EsRejectedExecutionException("rejected execution of write operation [" + - "write_bytes=" + bytesWithoutOperation + ", " + - "replica_write_bytes=" + replicaWriteBytes + ", " + - "total_write_bytes=" + totalBytesWithoutOperation + ", " + - "current_operation_bytes=" + bytes + ", " + - "max_write_bytes=" + currentWriteLimits + "]", false); - } - return () -> this.writeBytes.getAndAdd(-bytes); - } - - public long getWriteBytes() { - return writeBytes.get(); - } - - public Releasable markReplicaWriteStarted(long bytes, boolean forceExecution) { - long currentReplicaWriteLimits = (long) (this.writeLimits * 1.5); - long replicaWriteBytes = this.replicaWriteBytes.getAndAdd(bytes); - if (forceExecution == false && replicaWriteBytes > currentReplicaWriteLimits) { - long replicaBytesWithoutOperation = replicaWriteBytes - bytes; - this.replicaWriteBytes.getAndAdd(-bytes); - throw new EsRejectedExecutionException("rejected execution of replica write operation [" + - "replica_write_bytes=" + replicaBytesWithoutOperation + ", " + - "current_replica_operation_bytes=" + bytes + ", " + - "max_replica_write_bytes=" + currentReplicaWriteLimits + "]", false); - } - return () -> this.replicaWriteBytes.getAndAdd(-bytes); - } - - public long getReplicaWriteBytes() { - return replicaWriteBytes.get(); - } -} diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 74ddcf54b32..47f287ba013 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -20,7 +20,7 @@ package org.elasticsearch.action.resync; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -57,11 +57,11 @@ public class TransportResyncReplicationAction extends TransportWriteAction extends TransportReplicationAction { private final boolean forceExecution; - private final WriteMemoryLimits writeMemoryLimits; + private final IndexingPressure indexingPressure; private final String executor; protected TransportWriteAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader request, Writeable.Reader replicaRequest, String executor, boolean forceExecutionOnPrimary, - WriteMemoryLimits writeMemoryLimits) { + IndexingPressure indexingPressure) { // We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the // ThreadPool.Names.WRITE thread pool in this class. super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary); this.executor = executor; this.forceExecution = forceExecutionOnPrimary; - this.writeMemoryLimits = writeMemoryLimits; + this.indexingPressure = indexingPressure; } @Override protected Releasable checkOperationLimits(Request request) { - return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request), forceExecution); + return indexingPressure.markIndexingOperationStarted(primaryOperationSize(request), forceExecution); } @Override @@ -90,7 +90,7 @@ public abstract class TransportWriteAction< if (rerouteWasLocal) { return () -> {}; } else { - return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request), forceExecution); + return indexingPressure.markIndexingOperationStarted(primaryOperationSize(request), forceExecution); } } @@ -100,7 +100,7 @@ public abstract class TransportWriteAction< @Override protected Releasable checkReplicaLimits(ReplicaRequest request) { - return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request), forceExecution); + return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecution); } protected long replicaOperationSize(ReplicaRequest request) { diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 844fba08ad1..1d091f56cc1 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -21,7 +21,7 @@ package org.elasticsearch.common.settings; import org.apache.logging.log4j.LogManager; import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.DestructiveOperations; @@ -556,7 +556,7 @@ public final class ClusterSettings extends AbstractScopedSettings { FsHealthService.ENABLED_SETTING, FsHealthService.REFRESH_INTERVAL_SETTING, FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING, - WriteMemoryLimits.MAX_INDEXING_BYTES))); + IndexingPressure.MAX_INDEXING_BYTES))); public static List> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList( SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER, diff --git a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java new file mode 100644 index 00000000000..9c8fb83fe4f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index; + +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.index.stats.IndexingPressureStats; + +import java.util.concurrent.atomic.AtomicLong; + +public class IndexingPressure { + + public static final Setting MAX_INDEXING_BYTES = + Setting.memorySizeSetting("indexing_pressure.memory.limit", "10%", Setting.Property.NodeScope); + + private final AtomicLong currentCoordinatingAndPrimaryBytes = new AtomicLong(0); + private final AtomicLong currentReplicaBytes = new AtomicLong(0); + private final AtomicLong totalCoordinatingAndPrimaryBytes = new AtomicLong(0); + private final AtomicLong totalReplicaBytes = new AtomicLong(0); + private final AtomicLong coordinatingAndPrimaryRejections = new AtomicLong(0); + private final AtomicLong replicaRejections = new AtomicLong(0); + + private final long primaryAndCoordinatingLimits; + private final long replicaLimits; + + public IndexingPressure(Settings settings) { + this.primaryAndCoordinatingLimits = MAX_INDEXING_BYTES.get(settings).getBytes(); + this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5); + } + + public Releasable markIndexingOperationStarted(long bytes) { + return markIndexingOperationStarted(bytes, false); + } + + public Releasable markIndexingOperationStarted(long bytes, boolean forceExecution) { + long writeBytes = this.currentCoordinatingAndPrimaryBytes.addAndGet(bytes); + long replicaWriteBytes = this.currentReplicaBytes.get(); + long totalBytes = writeBytes + replicaWriteBytes; + if (forceExecution == false && totalBytes > primaryAndCoordinatingLimits) { + long bytesWithoutOperation = writeBytes - bytes; + long totalBytesWithoutOperation = totalBytes - bytes; + this.currentCoordinatingAndPrimaryBytes.getAndAdd(-bytes); + this.coordinatingAndPrimaryRejections.getAndIncrement(); + throw new EsRejectedExecutionException("rejected execution of operation [" + + "coordinating_and_primary_bytes=" + bytesWithoutOperation + ", " + + "replica_bytes=" + replicaWriteBytes + ", " + + "all_bytes=" + totalBytesWithoutOperation + ", " + + "operation_bytes=" + bytes + ", " + + "max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + "]", false); + } + totalCoordinatingAndPrimaryBytes.getAndAdd(bytes); + return () -> this.currentCoordinatingAndPrimaryBytes.getAndAdd(-bytes); + } + + public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution) { + long replicaWriteBytes = this.currentReplicaBytes.getAndAdd(bytes); + if (forceExecution == false && replicaWriteBytes > replicaLimits) { + long replicaBytesWithoutOperation = replicaWriteBytes - bytes; + this.currentReplicaBytes.getAndAdd(-bytes); + this.replicaRejections.getAndIncrement(); + throw new EsRejectedExecutionException("rejected execution of replica operation [" + + "replica_bytes=" + replicaBytesWithoutOperation + ", " + + "replica_operation_bytes=" + bytes + ", " + + "max_replica_bytes=" + replicaLimits + "]", false); + } + totalReplicaBytes.getAndAdd(bytes); + return () -> this.currentReplicaBytes.getAndAdd(-bytes); + } + + public long getCurrentCoordinatingAndPrimaryBytes() { + return currentCoordinatingAndPrimaryBytes.get(); + } + + public long getCurrentReplicaBytes() { + return currentReplicaBytes.get(); + } + + public long getTotalCoordinatingAndPrimaryBytes() { + return totalCoordinatingAndPrimaryBytes.get(); + } + + public long getTotalReplicaBytes() { + return totalReplicaBytes.get(); + } + + public IndexingPressureStats stats() { + return new IndexingPressureStats(totalCoordinatingAndPrimaryBytes.get(), totalReplicaBytes.get(), + currentCoordinatingAndPrimaryBytes.get(), currentReplicaBytes.get(), coordinatingAndPrimaryRejections.get(), + replicaRejections.get()); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 54a418fe673..dd08f8ff763 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -25,7 +25,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteResponse; @@ -80,7 +80,7 @@ public class RetentionLeaseSyncAction extends final ThreadPool threadPool, final ShardStateAction shardStateAction, final ActionFilters actionFilters, - final WriteMemoryLimits writeMemoryLimits) { + final IndexingPressure indexingPressure) { super( settings, ACTION_NAME, @@ -92,7 +92,7 @@ public class RetentionLeaseSyncAction extends actionFilters, RetentionLeaseSyncAction.Request::new, RetentionLeaseSyncAction.Request::new, - ThreadPool.Names.MANAGEMENT, false, writeMemoryLimits); + ThreadPool.Names.MANAGEMENT, false, indexingPressure); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java b/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java new file mode 100644 index 00000000000..309cf863b63 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java @@ -0,0 +1,85 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.stats; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +public class IndexingPressureStats implements Writeable, ToXContentFragment { + + private final long totalCoordinatingAndPrimaryBytes; + private final long totalReplicaBytes; + private final long currentCoordinatingAndPrimaryBytes; + private final long currentReplicaBytes; + private final long coordinatingAndPrimaryRejections; + private final long replicaRejections; + + public IndexingPressureStats(StreamInput in) throws IOException { + totalCoordinatingAndPrimaryBytes = in.readVLong(); + totalReplicaBytes = in.readVLong(); + currentCoordinatingAndPrimaryBytes = in.readVLong(); + currentReplicaBytes = in.readVLong(); + coordinatingAndPrimaryRejections = in.readVLong(); + replicaRejections = in.readVLong(); + } + + public IndexingPressureStats(long totalCoordinatingAndPrimaryBytes, long totalReplicaBytes, long currentCoordinatingAndPrimaryBytes, + long currentReplicaBytes, long coordinatingAndPrimaryRejections, long replicaRejections) { + this.totalCoordinatingAndPrimaryBytes = totalCoordinatingAndPrimaryBytes; + this.totalReplicaBytes = totalReplicaBytes; + this.currentCoordinatingAndPrimaryBytes = currentCoordinatingAndPrimaryBytes; + this.currentReplicaBytes = currentReplicaBytes; + this.coordinatingAndPrimaryRejections = coordinatingAndPrimaryRejections; + this.replicaRejections = replicaRejections; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(totalCoordinatingAndPrimaryBytes); + out.writeVLong(totalReplicaBytes); + out.writeVLong(currentCoordinatingAndPrimaryBytes); + out.writeVLong(currentReplicaBytes); + out.writeVLong(coordinatingAndPrimaryRejections); + out.writeVLong(replicaRejections); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("indexing_pressure"); + builder.startObject("total"); + builder.field("coordinating_and_primary_bytes", totalCoordinatingAndPrimaryBytes); + builder.field("replica_bytes", totalReplicaBytes); + builder.field("all_bytes", totalReplicaBytes + totalCoordinatingAndPrimaryBytes); + builder.field("coordinating_and_primary_memory_limit_rejections", coordinatingAndPrimaryRejections); + builder.field("replica_memory_limit_rejections", replicaRejections); + builder.endObject(); + builder.startObject("current"); + builder.field("coordinating_and_primary_bytes", currentCoordinatingAndPrimaryBytes); + builder.field("replica_bytes", currentReplicaBytes); + builder.field("all_bytes", currentCoordinatingAndPrimaryBytes + currentReplicaBytes); + builder.endObject(); + return builder.endObject(); + } +} diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 53f517ddfec..2c4c025a91e 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -31,7 +31,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionModule; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.search.SearchExecutionStatsCollector; import org.elasticsearch.action.search.SearchPhaseController; import org.elasticsearch.action.search.SearchTransportService; @@ -547,6 +547,7 @@ public class Node implements Closeable { final SearchTransportService searchTransportService = new SearchTransportService(transportService, SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); final HttpServerTransport httpServerTransport = newHttpTransport(networkModule); + final IndexingPressure indexingLimits = new IndexingPressure(settings); final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings()); RepositoriesModule repositoriesModule = new RepositoriesModule(this.environment, @@ -577,7 +578,7 @@ public class Node implements Closeable { this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(), transportService, indicesService, pluginsService, circuitBreakerService, scriptService, httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService, - searchTransportService); + searchTransportService, indexingLimits); final SearchService searchService = newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, searchModule.getFetchPhase(), @@ -595,7 +596,6 @@ public class Node implements Closeable { new PersistentTasksClusterService(settings, registry, clusterService, threadPool); resourcesToClose.add(persistentTasksClusterService); final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); - final WriteMemoryLimits bulkIndexingLimits = new WriteMemoryLimits(settings); modules.add(b -> { b.bind(Node.class).toInstance(this); @@ -614,7 +614,7 @@ public class Node implements Closeable { b.bind(ScriptService.class).toInstance(scriptService); b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry()); b.bind(IngestService.class).toInstance(ingestService); - b.bind(WriteMemoryLimits.class).toInstance(bulkIndexingLimits); + b.bind(IndexingPressure.class).toInstance(indexingLimits); b.bind(UsageService.class).toInstance(usageService); b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService()); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); diff --git a/server/src/main/java/org/elasticsearch/node/NodeService.java b/server/src/main/java/org/elasticsearch/node/NodeService.java index 98e1f18dff2..f72923dee98 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeService.java +++ b/server/src/main/java/org/elasticsearch/node/NodeService.java @@ -19,6 +19,7 @@ package org.elasticsearch.node; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Build; import org.elasticsearch.Version; @@ -59,6 +60,7 @@ public class NodeService implements Closeable { private final HttpServerTransport httpServerTransport; private final ResponseCollectorService responseCollectorService; private final SearchTransportService searchTransportService; + private final IndexingPressure indexingPressure; private final Discovery discovery; @@ -67,7 +69,7 @@ public class NodeService implements Closeable { CircuitBreakerService circuitBreakerService, ScriptService scriptService, @Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService, SettingsFilter settingsFilter, ResponseCollectorService responseCollectorService, - SearchTransportService searchTransportService) { + SearchTransportService searchTransportService, IndexingPressure indexingPressure) { this.settings = settings; this.threadPool = threadPool; this.monitorService = monitorService; @@ -82,6 +84,7 @@ public class NodeService implements Closeable { this.scriptService = scriptService; this.responseCollectorService = responseCollectorService; this.searchTransportService = searchTransportService; + this.indexingPressure = indexingPressure; clusterService.addStateApplier(ingestService); } @@ -103,7 +106,8 @@ public class NodeService implements Closeable { public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool, boolean fs, boolean transport, boolean http, boolean circuitBreaker, - boolean script, boolean discoveryStats, boolean ingest, boolean adaptiveSelection, boolean scriptCache) { + boolean script, boolean discoveryStats, boolean ingest, boolean adaptiveSelection, boolean scriptCache, + boolean indexingPressure) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) return new NodeStats(transportService.getLocalNode(), System.currentTimeMillis(), @@ -120,7 +124,8 @@ public class NodeService implements Closeable { discoveryStats ? discovery.stats() : null, ingest ? ingestService.stats() : null, adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null, - scriptCache ? scriptService.cacheStats() : null + scriptCache ? scriptService.cacheStats() : null, + indexingPressure ? this.indexingPressure.stats() : null ); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 4c0dc158c8e..f53115f4f10 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -518,7 +518,7 @@ public class NodeStatsTests extends ESTestCase { //TODO NodeIndicesStats are not tested here, way too complicated to create, also they need to be migrated to Writeable yet return new NodeStats(node, randomNonNegativeLong(), null, osStats, processStats, jvmStats, threadPoolStats, fsInfo, transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats, - ingestStats, adaptiveSelectionStats, scriptCacheStats); + ingestStats, adaptiveSelectionStats, scriptCacheStats, null); } private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index d4cb93eac6e..b29fb73bf24 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -121,7 +122,7 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa final ExecutorService direct = EsExecutors.newDirectExecutorService(); when(threadPool.executor(anyString())).thenReturn(direct); TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService, - null, null, null, mock(ActionFilters.class), null, null, new WriteMemoryLimits(Settings.EMPTY)) { + null, null, null, mock(ActionFilters.class), null, null, new IndexingPressure(Settings.EMPTY)) { @Override void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, AtomicArray responses, Map indicesThatCannotBeCreated) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 540e66603c5..d0b184afdf7 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -51,6 +51,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; @@ -142,7 +143,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { new AutoCreateIndex( SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new IndexNameExpressionResolver() - ), new WriteMemoryLimits(SETTINGS) + ), new IndexingPressure(SETTINGS) ); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 06e5786a6eb..2b192e10a0a 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -45,6 +45,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -81,7 +82,7 @@ public class TransportBulkActionTests extends ESTestCase { super(TransportBulkActionTests.this.threadPool, transportService, clusterService, null, null, null, new ActionFilters(Collections.emptySet()), new Resolver(), new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver()), - new WriteMemoryLimits(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY)); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index b249867060b..26c46feb871 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.rest.action.document.RestBulkAction; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -240,7 +241,7 @@ public class TransportBulkActionTookTests extends ESTestCase { actionFilters, indexNameExpressionResolver, autoCreateIndex, - new WriteMemoryLimits(Settings.EMPTY), + new IndexingPressure(Settings.EMPTY), relativeTimeProvider); } diff --git a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java index 8859651feed..e3b16b43fe3 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.action.resync; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterState; @@ -145,7 +145,7 @@ public class TransportResyncReplicationActionTests extends ESTestCase { final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService, clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()), - new WriteMemoryLimits(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY)); assertThat(action.globalBlockLevel(), nullValue()); assertThat(action.indexBlockLevel(), nullValue()); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 6535ab6d68f..68ac15cba23 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -21,7 +21,7 @@ package org.elasticsearch.action.support.replication; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; @@ -367,7 +367,7 @@ public class TransportWriteActionTests extends ESTestCase { new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null, new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false, - new WriteMemoryLimits(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY)); this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary; this.withDocumentFailureOnReplica = withDocumentFailureOnReplica; } @@ -377,7 +377,7 @@ public class TransportWriteActionTests extends ESTestCase { super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, shardStateAction, new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false, - new WriteMemoryLimits(settings)); + new IndexingPressure(settings)); this.withDocumentFailureOnPrimary = false; this.withDocumentFailureOnReplica = false; } diff --git a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java index a5674994b61..de577364e4d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java @@ -153,13 +153,13 @@ public class DiskUsageTests extends ESTestCase { List nodeStats = Arrays.asList( new NodeStats(new DiscoveryNode("node_1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null, - null), + null, null), new NodeStats(new DiscoveryNode("node_2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null, - null), + null, null), new NodeStats(new DiscoveryNode("node_3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null, - null) + null, null) ); InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvaiableUsages, newMostAvaiableUsages); DiskUsage leastNode_1 = newLeastAvaiableUsages.get("node_1"); @@ -197,13 +197,13 @@ public class DiskUsageTests extends ESTestCase { List nodeStats = Arrays.asList( new NodeStats(new DiscoveryNode("node_1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null, - null), + null, null), new NodeStats(new DiscoveryNode("node_2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null, - null), + null, null), new NodeStats(new DiscoveryNode("node_3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null, - null) + null, null) ); InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvailableUsages, newMostAvailableUsages); DiskUsage leastNode_1 = newLeastAvailableUsages.get("node_1"); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index a037f79957c..1c7c24f429e 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; @@ -105,7 +105,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase { threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), - new WriteMemoryLimits(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY)); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); action.dispatchedShardOperationOnPrimary(request, indexShard, @@ -142,7 +142,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase { threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), - new WriteMemoryLimits(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY)); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); @@ -182,7 +182,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase { threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), - new WriteMemoryLimits(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY)); assertNull(action.indexBlockLevel()); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index e570c35f1ce..e002fc197b2 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -65,7 +65,7 @@ import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAct import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction; import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction; import org.elasticsearch.action.bulk.BulkAction; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.TransportBulkAction; @@ -1483,7 +1483,7 @@ public class SnapshotResiliencyTests extends ESTestCase { threadPool, shardStateAction, actionFilters, - new WriteMemoryLimits(settings))), + new IndexingPressure(settings))), new GlobalCheckpointSyncAction( settings, transportService, @@ -1509,7 +1509,7 @@ public class SnapshotResiliencyTests extends ESTestCase { mappingUpdatedAction.setClient(client); final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService, clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService), - actionFilters, new WriteMemoryLimits(settings)); + actionFilters, new IndexingPressure(settings)); actions.put(BulkAction.INSTANCE, new TransportBulkAction(threadPool, transportService, clusterService, new IngestService( @@ -1517,7 +1517,7 @@ public class SnapshotResiliencyTests extends ESTestCase { new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(), Collections.emptyList(), client), transportShardBulkAction, client, actionFilters, indexNameExpressionResolver, - new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver), new WriteMemoryLimits(settings) + new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver), new IndexingPressure(settings) )); final RestoreService restoreService = new RestoreService( clusterService, repositoriesService, allocationService, diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index 9b0181f3550..9ce0310edff 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -84,7 +84,8 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService { .map(fsInfoPath -> diskUsageFunction.apply(discoveryNode, fsInfoPath)) .toArray(FsInfo.Path[]::new)), nodeStats.getTransport(), nodeStats.getHttp(), nodeStats.getBreaker(), nodeStats.getScriptStats(), nodeStats.getDiscoveryStats(), - nodeStats.getIngestStats(), nodeStats.getAdaptiveSelectionStats(), nodeStats.getScriptCacheStats()); + nodeStats.getIngestStats(), nodeStats.getAdaptiveSelectionStats(), nodeStats.getScriptCacheStats(), + nodeStats.getIndexingPressureStats()); }).collect(Collectors.toList()); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index c3414ea624d..006303004b2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -37,7 +37,7 @@ import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExc import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; @@ -1350,13 +1350,13 @@ public final class InternalTestCluster extends TestCluster { private void assertAllPendingWriteLimitsReleased() throws Exception { assertBusy(() -> { for (NodeAndClient nodeAndClient : nodes.values()) { - WriteMemoryLimits writeMemoryLimits = getInstance(WriteMemoryLimits.class, nodeAndClient.name); - final long writeBytes = writeMemoryLimits.getWriteBytes(); + IndexingPressure indexingPressure = getInstance(IndexingPressure.class, nodeAndClient.name); + final long writeBytes = indexingPressure.getCurrentCoordinatingAndPrimaryBytes(); if (writeBytes > 0) { throw new AssertionError("pending write bytes [" + writeBytes + "] bytes on node [" + nodeAndClient.name + "]."); } - final long replicaWriteBytes = writeMemoryLimits.getReplicaWriteBytes(); + final long replicaWriteBytes = indexingPressure.getCurrentReplicaBytes(); if (replicaWriteBytes > 0) { throw new AssertionError("pending replica write bytes [" + writeBytes + "] bytes on node [" + nodeAndClient.name + "]."); @@ -2497,7 +2497,7 @@ public final class InternalTestCluster extends TestCluster { NodeService nodeService = getInstanceFromNode(NodeService.class, nodeAndClient.node); CommonStatsFlags flags = new CommonStatsFlags(Flag.FieldData, Flag.QueryCache, Flag.Segments); NodeStats stats = nodeService.stats(flags, - false, false, false, false, false, false, false, false, false, false, false, false, false); + false, false, false, false, false, false, false, false, false, false, false, false, false, false); assertThat("Fielddata size must be 0 on node: " + stats.getNode(), stats.getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); assertThat("Query cache size must be 0 on node: " + stats.getNode(), diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index ca5a2b268b7..60fc3a1814c 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -6,7 +6,7 @@ package org.elasticsearch.xpack.ccr; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -136,12 +136,12 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase { final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower"); client().execute(PutFollowAction.INSTANCE, followRequest).get(); - WriteMemoryLimits memoryLimits = getInstanceFromNode(WriteMemoryLimits.class); + IndexingPressure memoryLimits = getInstanceFromNode(IndexingPressure.class); final long finalSourceSize = sourceSize; assertBusy(() -> { // The actual write bytes will be greater due to other request fields. However, this test is // just spot checking that the bytes are incremented at all. - assertTrue(memoryLimits.getWriteBytes() > finalSourceSize); + assertTrue(memoryLimits.getCurrentCoordinatingAndPrimaryBytes() > finalSourceSize); }); blocker.countDown(); assertBusy(() -> { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index c0a2db2d3b4..fa206bcca26 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -9,7 +9,7 @@ package org.elasticsearch.xpack.ccr.action.bulk; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -38,7 +38,7 @@ import java.util.List; public class TransportBulkShardOperationsAction extends TransportWriteAction { - private final WriteMemoryLimits writeMemoryLimits; + private final IndexingPressure indexingPressure; @Inject public TransportBulkShardOperationsAction( @@ -49,7 +49,7 @@ public class TransportBulkShardOperationsAction final ThreadPool threadPool, final ShardStateAction shardStateAction, final ActionFilters actionFilters, - final WriteMemoryLimits writeMemoryLimits) { + final IndexingPressure indexingPressure) { super( settings, BulkShardOperationsAction.NAME, @@ -61,14 +61,14 @@ public class TransportBulkShardOperationsAction actionFilters, BulkShardOperationsRequest::new, BulkShardOperationsRequest::new, - ThreadPool.Names.WRITE, false, writeMemoryLimits); - this.writeMemoryLimits = writeMemoryLimits; + ThreadPool.Names.WRITE, false, indexingPressure); + this.indexingPressure = indexingPressure; } @Override protected void doExecute(Task task, BulkShardOperationsRequest request, ActionListener listener) { // This is executed on the follower coordinator node and we need to mark the bytes. - Releasable releasable = writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); + Releasable releasable = indexingPressure.markIndexingOperationStarted(primaryOperationSize(request)); ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); try { super.doExecute(task, request, releasingListener); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java index a0ea1222d12..2d1f99e32b7 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java @@ -641,7 +641,7 @@ public class MachineLearningFeatureSetTests extends ESTestCase { IntStream.range(0, pipelineNames.size()).boxed().collect(Collectors.toMap(pipelineNames::get, processorStats::get))); return new NodeStats(mock(DiscoveryNode.class), Instant.now().toEpochMilli(), null, null, null, null, null, null, null, null, - null, null, null, ingestStats, null, null); + null, null, null, ingestStats, null, null, null); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java index 33630ec4634..b090efdbffc 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java @@ -300,7 +300,7 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase { IntStream.range(0, pipelineids.size()).boxed().collect(Collectors.toMap(pipelineids::get, processorStats::get))); return new NodeStats(mock(DiscoveryNode.class), Instant.now().toEpochMilli(), null, null, null, null, null, null, null, null, - null, null, null, ingestStats, null, null); + null, null, null, ingestStats, null, null, null); } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java index a11ad4a325c..99f5f188cf4 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java @@ -366,6 +366,7 @@ public class NodeStatsMonitoringDocTests extends BaseFilteredMonitoringDocTestCa emptySet(), Version.V_6_0_0_beta1); - return new NodeStats(discoveryNode, no, indices, os, process, jvm, threadPool, fs, null, null, null, null, null, null, null, null); + return new NodeStats(discoveryNode, no, indices, os, process, jvm, threadPool, fs, null, null, null, null, null, null, null, null, + null); } }