Adding indexing pressure stats to node stats API (#59467)
We have recently added internal metrics to monitor the amount of indexing occurring on a node. These metrics introduce back pressure to indexing when memory utilization is too high. This commit exposes these stats through the node stats API.
This commit is contained in:
parent
dc7d4c615c
commit
623df95a32
|
@ -0,0 +1,129 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.http;
|
||||
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.test.XContentTestUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.rest.RestStatus.CREATED;
|
||||
import static org.elasticsearch.rest.RestStatus.OK;
|
||||
import static org.elasticsearch.rest.RestStatus.TOO_MANY_REQUESTS;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
|
||||
/**
|
||||
* Test Indexing Pressure Metrics and Statistics
|
||||
*/
|
||||
@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
|
||||
public class IndexingPressureRestIT extends HttpSmokeTestCase {
|
||||
|
||||
private static final Settings unboundedWriteQueue = Settings.builder().put("thread_pool.write.queue_size", -1).build();
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1KB")
|
||||
.put(unboundedWriteQueue)
|
||||
.build();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testIndexingPressureStats() throws IOException {
|
||||
Request createRequest = new Request("PUT", "/index_name");
|
||||
createRequest.setJsonEntity("{\"settings\": {\"index\": {\"number_of_shards\": 1, \"number_of_replicas\": 1, " +
|
||||
"\"write.wait_for_active_shards\": 2}}}");
|
||||
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
|
||||
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
|
||||
|
||||
Request successfulIndexingRequest = new Request("POST", "/index_name/_doc/");
|
||||
successfulIndexingRequest.setJsonEntity("{\"x\": \"small text\"}");
|
||||
final Response indexSuccessFul = getRestClient().performRequest(successfulIndexingRequest);
|
||||
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(CREATED.getStatus()));
|
||||
|
||||
Request getNodeStats = new Request("GET", "/_nodes/stats/indexing_pressure");
|
||||
final Response nodeStats = getRestClient().performRequest(getNodeStats);
|
||||
Map<String, Object> nodeStatsMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, nodeStats.getEntity().getContent(), true);
|
||||
ArrayList<Object> values = new ArrayList<>(((Map<Object, Object>) nodeStatsMap.get("nodes")).values());
|
||||
assertThat(values.size(), equalTo(2));
|
||||
XContentTestUtils.JsonMapView node1 = new XContentTestUtils.JsonMapView((Map<String, Object>) values.get(0));
|
||||
Integer node1IndexingBytes = node1.get("indexing_pressure.total.coordinating_and_primary_bytes");
|
||||
Integer node1ReplicaBytes = node1.get("indexing_pressure.total.replica_bytes");
|
||||
Integer node1Rejections = node1.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");
|
||||
XContentTestUtils.JsonMapView node2 = new XContentTestUtils.JsonMapView((Map<String, Object>) values.get(1));
|
||||
Integer node2IndexingBytes = node2.get("indexing_pressure.total.coordinating_and_primary_bytes");
|
||||
Integer node2ReplicaBytes = node2.get("indexing_pressure.total.replica_bytes");
|
||||
Integer node2Rejections = node2.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");
|
||||
|
||||
if (node1IndexingBytes == 0) {
|
||||
assertThat(node2IndexingBytes, greaterThan(0));
|
||||
assertThat(node2IndexingBytes, lessThan(1024));
|
||||
} else {
|
||||
assertThat(node1IndexingBytes, greaterThan(0));
|
||||
assertThat(node1IndexingBytes, lessThan(1024));
|
||||
}
|
||||
|
||||
if (node1ReplicaBytes == 0) {
|
||||
assertThat(node2ReplicaBytes, greaterThan(0));
|
||||
assertThat(node2ReplicaBytes, lessThan(1024));
|
||||
} else {
|
||||
assertThat(node2ReplicaBytes, equalTo(0));
|
||||
assertThat(node1ReplicaBytes, lessThan(1024));
|
||||
}
|
||||
|
||||
assertThat(node1Rejections, equalTo(0));
|
||||
assertThat(node2Rejections, equalTo(0));
|
||||
|
||||
Request failedIndexingRequest = new Request("POST", "/index_name/_doc/");
|
||||
String largeString = randomAlphaOfLength(10000);
|
||||
failedIndexingRequest.setJsonEntity("{\"x\": " + largeString + "}");
|
||||
ResponseException exception = expectThrows(ResponseException.class, () -> getRestClient().performRequest(failedIndexingRequest));
|
||||
assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(TOO_MANY_REQUESTS.getStatus()));
|
||||
|
||||
Request getNodeStats2 = new Request("GET", "/_nodes/stats/indexing_pressure");
|
||||
final Response nodeStats2 = getRestClient().performRequest(getNodeStats2);
|
||||
Map<String, Object> nodeStatsMap2 = XContentHelper.convertToMap(JsonXContent.jsonXContent, nodeStats2.getEntity().getContent(),
|
||||
true);
|
||||
ArrayList<Object> values2 = new ArrayList<>(((Map<Object, Object>) nodeStatsMap2.get("nodes")).values());
|
||||
assertThat(values2.size(), equalTo(2));
|
||||
XContentTestUtils.JsonMapView node1AfterRejection = new XContentTestUtils.JsonMapView((Map<String, Object>) values2.get(0));
|
||||
node1Rejections = node1AfterRejection.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");
|
||||
XContentTestUtils.JsonMapView node2AfterRejection = new XContentTestUtils.JsonMapView((Map<String, Object>) values2.get(1));
|
||||
node2Rejections = node2AfterRejection.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");
|
||||
|
||||
if (node1Rejections == 0) {
|
||||
assertThat(node2Rejections, equalTo(1));
|
||||
} else {
|
||||
assertThat(node1Rejections, equalTo(1));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -44,7 +44,8 @@
|
|||
"process",
|
||||
"thread_pool",
|
||||
"transport",
|
||||
"discovery"
|
||||
"discovery",
|
||||
"indexing_pressure"
|
||||
],
|
||||
"description":"Limit the information returned to the specified metrics"
|
||||
}
|
||||
|
@ -69,7 +70,8 @@
|
|||
"process",
|
||||
"thread_pool",
|
||||
"transport",
|
||||
"discovery"
|
||||
"discovery",
|
||||
"indexing_pressure"
|
||||
],
|
||||
"description":"Limit the information returned to the specified metrics"
|
||||
},
|
||||
|
@ -98,7 +100,8 @@
|
|||
"process",
|
||||
"thread_pool",
|
||||
"transport",
|
||||
"discovery"
|
||||
"discovery",
|
||||
"indexing_pressure"
|
||||
],
|
||||
"description":"Limit the information returned to the specified metrics"
|
||||
},
|
||||
|
@ -145,7 +148,8 @@
|
|||
"process",
|
||||
"thread_pool",
|
||||
"transport",
|
||||
"discovery"
|
||||
"discovery",
|
||||
"indexing_pressure"
|
||||
],
|
||||
"description":"Limit the information returned to the specified metrics"
|
||||
},
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
---
|
||||
"Indexing pressure stats":
|
||||
- skip:
|
||||
version: " - 7.8.99"
|
||||
reason: "indexing_pressure was added in 7.9"
|
||||
features: [arbitrary_key]
|
||||
|
||||
- do:
|
||||
nodes.info: {}
|
||||
- set:
|
||||
nodes._arbitrary_key_: node_id
|
||||
|
||||
- do:
|
||||
nodes.stats:
|
||||
metric: [ indexing_pressure ]
|
||||
|
||||
- gte: { nodes.$node_id.indexing_pressure.total.coordinating_and_primary_bytes: 0 }
|
||||
- gte: { nodes.$node_id.indexing_pressure.total.replica_bytes: 0 }
|
||||
- gte: { nodes.$node_id.indexing_pressure.total.all_bytes: 0 }
|
||||
- gte: { nodes.$node_id.indexing_pressure.total.coordinating_and_primary_memory_limit_rejections: 0 }
|
||||
- gte: { nodes.$node_id.indexing_pressure.total.replica_memory_limit_rejections: 0 }
|
||||
- gte: { nodes.$node_id.indexing_pressure.current.coordinating_and_primary_bytes: 0 }
|
||||
- gte: { nodes.$node_id.indexing_pressure.current.replica_bytes: 0 }
|
||||
- gte: { nodes.$node_id.indexing_pressure.current.all_bytes: 0 }
|
||||
|
||||
# TODO:
|
||||
#
|
||||
# Change skipped version after backport
|
|
@ -16,11 +16,14 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.bulk;
|
||||
package org.elasticsearch.index;
|
||||
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.bulk.TransportShardBulkAction;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
|
@ -51,7 +54,7 @@ import static org.hamcrest.Matchers.greaterThan;
|
|||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1, transportClientRatio = 0.0D)
|
||||
public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
||||
public class IndexingPressureIT extends ESIntegTestCase {
|
||||
|
||||
// TODO: Add additional REST tests when metrics are exposed
|
||||
|
||||
|
@ -63,7 +66,6 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
// Need at least two threads because we are going to block one
|
||||
.put(unboundedWriteQueue)
|
||||
.build();
|
||||
}
|
||||
|
@ -134,16 +136,16 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
final ActionFuture<BulkResponse> successFuture = client(coordinatingOnlyNode).bulk(bulkRequest);
|
||||
replicationSendPointReached.await();
|
||||
|
||||
WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName);
|
||||
WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName);
|
||||
WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode);
|
||||
IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName);
|
||||
IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName);
|
||||
IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode);
|
||||
|
||||
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
|
||||
assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
|
||||
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
|
||||
assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes());
|
||||
assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
|
||||
|
||||
latchBlockingReplicationSend.countDown();
|
||||
|
||||
|
@ -165,14 +167,15 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
final long secondBulkShardRequestSize = request.ramBytesUsed();
|
||||
|
||||
if (usePrimaryAsCoordinatingNode) {
|
||||
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkRequestSize));
|
||||
assertEquals(0, replicaWriteLimits.getWriteBytes());
|
||||
assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(),
|
||||
greaterThan(bulkShardRequestSize + secondBulkRequestSize));
|
||||
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
|
||||
} else {
|
||||
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(secondBulkRequestSize, replicaWriteLimits.getWriteBytes());
|
||||
assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(secondBulkRequestSize, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
|
||||
}
|
||||
assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes());
|
||||
assertBusy(() -> assertThat(replicaWriteLimits.getReplicaWriteBytes(),
|
||||
assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
|
||||
assertBusy(() -> assertThat(replicaWriteLimits.getCurrentReplicaBytes(),
|
||||
greaterThan(bulkShardRequestSize + secondBulkShardRequestSize)));
|
||||
|
||||
replicaRelease.close();
|
||||
|
@ -180,12 +183,12 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
successFuture.actionGet();
|
||||
secondFuture.actionGet();
|
||||
|
||||
assertEquals(0, primaryWriteLimits.getWriteBytes());
|
||||
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
|
||||
assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
|
||||
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
|
||||
assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
|
||||
} finally {
|
||||
if (replicationSendPointReached.getCount() > 0) {
|
||||
replicationSendPointReached.countDown();
|
||||
|
@ -212,8 +215,8 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
|
||||
final long bulkRequestSize = bulkRequest.ramBytesUsed();
|
||||
final long bulkShardRequestSize = totalRequestSize;
|
||||
restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(),
|
||||
(long)(bulkShardRequestSize * 1.5) + "B").build());
|
||||
restartNodesWithSettings(Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(),
|
||||
(long) (bulkShardRequestSize * 1.5) + "B").build());
|
||||
|
||||
assertAcked(prepareCreate(INDEX_NAME, Settings.builder()
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
|
@ -229,17 +232,17 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) {
|
||||
final ActionFuture<BulkResponse> successFuture = client(coordinatingOnlyNode).bulk(bulkRequest);
|
||||
|
||||
WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName);
|
||||
WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName);
|
||||
WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode);
|
||||
IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName);
|
||||
IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName);
|
||||
IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode);
|
||||
|
||||
assertBusy(() -> {
|
||||
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getWriteBytes());
|
||||
assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
|
||||
assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
|
||||
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
|
||||
assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
|
||||
});
|
||||
|
||||
expectThrows(EsRejectedExecutionException.class, () -> {
|
||||
|
@ -256,12 +259,12 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
|
||||
successFuture.actionGet();
|
||||
|
||||
assertEquals(0, primaryWriteLimits.getWriteBytes());
|
||||
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
|
||||
assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
|
||||
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
|
||||
assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -276,7 +279,7 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
bulkRequest.add(request);
|
||||
}
|
||||
final long bulkShardRequestSize = totalRequestSize;
|
||||
restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(),
|
||||
restartNodesWithSettings(Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(),
|
||||
(long)(bulkShardRequestSize * 1.5) + "B").build());
|
||||
|
||||
assertAcked(prepareCreate(INDEX_NAME, Settings.builder()
|
||||
|
@ -293,17 +296,17 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) {
|
||||
final ActionFuture<BulkResponse> successFuture = client(primaryName).bulk(bulkRequest);
|
||||
|
||||
WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName);
|
||||
WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName);
|
||||
WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode);
|
||||
IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName);
|
||||
IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName);
|
||||
IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode);
|
||||
|
||||
assertBusy(() -> {
|
||||
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getWriteBytes());
|
||||
assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(0, coordinatingWriteLimits.getWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
|
||||
assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
|
||||
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
|
||||
assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
|
||||
});
|
||||
|
||||
BulkResponse responses = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
|
||||
|
@ -314,17 +317,17 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
|
||||
successFuture.actionGet();
|
||||
|
||||
assertEquals(0, primaryWriteLimits.getWriteBytes());
|
||||
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
|
||||
assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
|
||||
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
|
||||
assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
|
||||
}
|
||||
}
|
||||
|
||||
public void testWritesWillSucceedIfBelowThreshold() throws Exception {
|
||||
restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(), "1MB").build());
|
||||
restartNodesWithSettings(Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1MB").build());
|
||||
assertAcked(prepareCreate(INDEX_NAME, Settings.builder()
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)));
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.ToXContentFragment;
|
|||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.discovery.DiscoveryStats;
|
||||
import org.elasticsearch.http.HttpStats;
|
||||
import org.elasticsearch.index.stats.IndexingPressureStats;
|
||||
import org.elasticsearch.indices.NodeIndicesStats;
|
||||
import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
|
||||
import org.elasticsearch.ingest.IngestStats;
|
||||
|
@ -95,6 +96,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
|
|||
@Nullable
|
||||
private AdaptiveSelectionStats adaptiveSelectionStats;
|
||||
|
||||
@Nullable
|
||||
private IndexingPressureStats indexingPressureStats;
|
||||
|
||||
public NodeStats(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
timestamp = in.readVLong();
|
||||
|
@ -125,6 +129,11 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
|
|||
scriptCacheStats = scriptStats.toScriptCacheStats();
|
||||
}
|
||||
}
|
||||
if (in.getVersion().onOrAfter(Version.V_7_9_0)) {
|
||||
indexingPressureStats = in.readOptionalWriteable(IndexingPressureStats::new);
|
||||
} else {
|
||||
indexingPressureStats = null;
|
||||
}
|
||||
}
|
||||
|
||||
public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats indices,
|
||||
|
@ -135,7 +144,8 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
|
|||
@Nullable DiscoveryStats discoveryStats,
|
||||
@Nullable IngestStats ingestStats,
|
||||
@Nullable AdaptiveSelectionStats adaptiveSelectionStats,
|
||||
@Nullable ScriptCacheStats scriptCacheStats) {
|
||||
@Nullable ScriptCacheStats scriptCacheStats,
|
||||
@Nullable IndexingPressureStats indexingPressureStats) {
|
||||
super(node);
|
||||
this.timestamp = timestamp;
|
||||
this.indices = indices;
|
||||
|
@ -152,6 +162,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
|
|||
this.ingestStats = ingestStats;
|
||||
this.adaptiveSelectionStats = adaptiveSelectionStats;
|
||||
this.scriptCacheStats = scriptCacheStats;
|
||||
this.indexingPressureStats = indexingPressureStats;
|
||||
}
|
||||
|
||||
public long getTimestamp() {
|
||||
|
@ -251,6 +262,11 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
|
|||
return scriptCacheStats;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public IndexingPressureStats getIndexingPressureStats() {
|
||||
return indexingPressureStats;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
|
@ -277,6 +293,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
|
|||
} if (out.getVersion().onOrAfter(Version.V_7_8_0) && out.getVersion().before(Version.V_7_9_0)) {
|
||||
out.writeOptionalWriteable(scriptCacheStats);
|
||||
}
|
||||
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
|
||||
out.writeOptionalWriteable(indexingPressureStats);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -343,6 +362,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
|
|||
if (getScriptCacheStats() != null) {
|
||||
getScriptCacheStats().toXContent(builder, params);
|
||||
}
|
||||
if (getIndexingPressureStats() != null) {
|
||||
getIndexingPressureStats().toXContent(builder, params);
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -224,7 +224,8 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
|
|||
DISCOVERY("discovery"),
|
||||
INGEST("ingest"),
|
||||
ADAPTIVE_SELECTION("adaptive_selection"),
|
||||
SCRIPT_CACHE("script_cache");
|
||||
SCRIPT_CACHE("script_cache"),
|
||||
INDEXING_PRESSURE("indexing_pressure"),;
|
||||
|
||||
private String metricName;
|
||||
|
||||
|
|
|
@ -83,7 +83,8 @@ public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRe
|
|||
NodesStatsRequest.Metric.DISCOVERY.containedIn(metrics),
|
||||
NodesStatsRequest.Metric.INGEST.containedIn(metrics),
|
||||
NodesStatsRequest.Metric.ADAPTIVE_SELECTION.containedIn(metrics),
|
||||
NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics));
|
||||
NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics),
|
||||
NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics));
|
||||
}
|
||||
|
||||
public static class NodeStatsRequest extends BaseNodeRequest {
|
||||
|
|
|
@ -97,7 +97,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
|
|||
protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) {
|
||||
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false);
|
||||
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE,
|
||||
true, true, true, false, true, false, false, false, false, false, true, false, false);
|
||||
true, true, true, false, true, false, false, false, false, false, true, false, false, false);
|
||||
List<ShardStats> shardsStats = new ArrayList<>();
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.elasticsearch.index.Index;
|
|||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndexClosedException;
|
||||
|
@ -113,23 +114,23 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
private final NodeClient client;
|
||||
private final IndexNameExpressionResolver indexNameExpressionResolver;
|
||||
private static final String DROPPED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated";
|
||||
private final WriteMemoryLimits writeMemoryLimits;
|
||||
private final IndexingPressure indexingPressure;
|
||||
|
||||
@Inject
|
||||
public TransportBulkAction(ThreadPool threadPool, TransportService transportService,
|
||||
ClusterService clusterService, IngestService ingestService,
|
||||
TransportShardBulkAction shardBulkAction, NodeClient client,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
AutoCreateIndex autoCreateIndex, WriteMemoryLimits writeMemoryLimits) {
|
||||
AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure) {
|
||||
this(threadPool, transportService, clusterService, ingestService, shardBulkAction, client, actionFilters,
|
||||
indexNameExpressionResolver, autoCreateIndex, writeMemoryLimits, System::nanoTime);
|
||||
indexNameExpressionResolver, autoCreateIndex, indexingPressure, System::nanoTime);
|
||||
}
|
||||
|
||||
public TransportBulkAction(ThreadPool threadPool, TransportService transportService,
|
||||
ClusterService clusterService, IngestService ingestService,
|
||||
TransportShardBulkAction shardBulkAction, NodeClient client,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
AutoCreateIndex autoCreateIndex, WriteMemoryLimits writeMemoryLimits, LongSupplier relativeTimeProvider) {
|
||||
AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure, LongSupplier relativeTimeProvider) {
|
||||
super(BulkAction.NAME, transportService, actionFilters, BulkRequest::new, ThreadPool.Names.SAME);
|
||||
Objects.requireNonNull(relativeTimeProvider);
|
||||
this.threadPool = threadPool;
|
||||
|
@ -141,7 +142,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
this.ingestForwarder = new IngestActionForwarder(transportService);
|
||||
this.client = client;
|
||||
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
||||
this.writeMemoryLimits = writeMemoryLimits;
|
||||
this.indexingPressure = indexingPressure;
|
||||
clusterService.addStateApplier(this.ingestForwarder);
|
||||
}
|
||||
|
||||
|
@ -166,7 +167,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
@Override
|
||||
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
|
||||
long indexingBytes = bulkRequest.ramBytesUsed();
|
||||
final Releasable releasable = writeMemoryLimits.markWriteOperationStarted(indexingBytes);
|
||||
final Releasable releasable = indexingPressure.markIndexingOperationStarted(indexingBytes);
|
||||
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
|
||||
try {
|
||||
doInternalExecute(task, bulkRequest, releasingListener);
|
||||
|
|
|
@ -55,6 +55,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.get.GetResult;
|
||||
|
@ -91,9 +92,9 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
|
||||
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
|
||||
WriteMemoryLimits writeMemoryLimits) {
|
||||
IndexingPressure indexingPressure) {
|
||||
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false, writeMemoryLimits);
|
||||
BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false, indexingPressure);
|
||||
this.updateHelper = updateHelper;
|
||||
this.mappingUpdatedAction = mappingUpdatedAction;
|
||||
}
|
||||
|
|
|
@ -1,92 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.bulk;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class WriteMemoryLimits {
|
||||
|
||||
public static final Setting<ByteSizeValue> MAX_INDEXING_BYTES =
|
||||
Setting.memorySizeSetting("indexing_limits.memory.limit", "10%", Setting.Property.NodeScope);
|
||||
|
||||
private final AtomicLong writeBytes = new AtomicLong(0);
|
||||
private final AtomicLong replicaWriteBytes = new AtomicLong(0);
|
||||
private final long writeLimits;
|
||||
|
||||
public WriteMemoryLimits(Settings settings) {
|
||||
this.writeLimits = MAX_INDEXING_BYTES.get(settings).getBytes();
|
||||
}
|
||||
|
||||
public WriteMemoryLimits(Settings settings, ClusterSettings clusterSettings) {
|
||||
this.writeLimits = MAX_INDEXING_BYTES.get(settings).getBytes();
|
||||
}
|
||||
|
||||
public Releasable markWriteOperationStarted(long bytes) {
|
||||
return markWriteOperationStarted(bytes, false);
|
||||
}
|
||||
|
||||
public Releasable markWriteOperationStarted(long bytes, boolean forceExecution) {
|
||||
long currentWriteLimits = this.writeLimits;
|
||||
long writeBytes = this.writeBytes.addAndGet(bytes);
|
||||
long replicaWriteBytes = this.replicaWriteBytes.get();
|
||||
long totalBytes = writeBytes + replicaWriteBytes;
|
||||
if (forceExecution == false && totalBytes > currentWriteLimits) {
|
||||
long bytesWithoutOperation = writeBytes - bytes;
|
||||
long totalBytesWithoutOperation = totalBytes - bytes;
|
||||
this.writeBytes.getAndAdd(-bytes);
|
||||
throw new EsRejectedExecutionException("rejected execution of write operation [" +
|
||||
"write_bytes=" + bytesWithoutOperation + ", " +
|
||||
"replica_write_bytes=" + replicaWriteBytes + ", " +
|
||||
"total_write_bytes=" + totalBytesWithoutOperation + ", " +
|
||||
"current_operation_bytes=" + bytes + ", " +
|
||||
"max_write_bytes=" + currentWriteLimits + "]", false);
|
||||
}
|
||||
return () -> this.writeBytes.getAndAdd(-bytes);
|
||||
}
|
||||
|
||||
public long getWriteBytes() {
|
||||
return writeBytes.get();
|
||||
}
|
||||
|
||||
public Releasable markReplicaWriteStarted(long bytes, boolean forceExecution) {
|
||||
long currentReplicaWriteLimits = (long) (this.writeLimits * 1.5);
|
||||
long replicaWriteBytes = this.replicaWriteBytes.getAndAdd(bytes);
|
||||
if (forceExecution == false && replicaWriteBytes > currentReplicaWriteLimits) {
|
||||
long replicaBytesWithoutOperation = replicaWriteBytes - bytes;
|
||||
this.replicaWriteBytes.getAndAdd(-bytes);
|
||||
throw new EsRejectedExecutionException("rejected execution of replica write operation [" +
|
||||
"replica_write_bytes=" + replicaBytesWithoutOperation + ", " +
|
||||
"current_replica_operation_bytes=" + bytes + ", " +
|
||||
"max_replica_write_bytes=" + currentReplicaWriteLimits + "]", false);
|
||||
}
|
||||
return () -> this.replicaWriteBytes.getAndAdd(-bytes);
|
||||
}
|
||||
|
||||
public long getReplicaWriteBytes() {
|
||||
return replicaWriteBytes.get();
|
||||
}
|
||||
}
|
|
@ -20,7 +20,7 @@ package org.elasticsearch.action.resync;
|
|||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.WriteMemoryLimits;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.replication.ReplicationOperation;
|
||||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||
|
@ -57,11 +57,11 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
|
|||
public TransportResyncReplicationAction(Settings settings, TransportService transportService,
|
||||
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
|
||||
ShardStateAction shardStateAction, ActionFilters actionFilters,
|
||||
WriteMemoryLimits writeMemoryLimits) {
|
||||
IndexingPressure indexingPressure) {
|
||||
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
|
||||
true, /* we should never reject resync because of thread pool capacity on primary */
|
||||
writeMemoryLimits);
|
||||
indexingPressure);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.bulk.WriteMemoryLimits;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.TransportActions;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
|
@ -61,26 +61,26 @@ public abstract class TransportWriteAction<
|
|||
> extends TransportReplicationAction<Request, ReplicaRequest, Response> {
|
||||
|
||||
private final boolean forceExecution;
|
||||
private final WriteMemoryLimits writeMemoryLimits;
|
||||
private final IndexingPressure indexingPressure;
|
||||
private final String executor;
|
||||
|
||||
protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
|
||||
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
|
||||
ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader<Request> request,
|
||||
Writeable.Reader<ReplicaRequest> replicaRequest, String executor, boolean forceExecutionOnPrimary,
|
||||
WriteMemoryLimits writeMemoryLimits) {
|
||||
IndexingPressure indexingPressure) {
|
||||
// We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the
|
||||
// ThreadPool.Names.WRITE thread pool in this class.
|
||||
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary);
|
||||
this.executor = executor;
|
||||
this.forceExecution = forceExecutionOnPrimary;
|
||||
this.writeMemoryLimits = writeMemoryLimits;
|
||||
this.indexingPressure = indexingPressure;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Releasable checkOperationLimits(Request request) {
|
||||
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request), forceExecution);
|
||||
return indexingPressure.markIndexingOperationStarted(primaryOperationSize(request), forceExecution);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -90,7 +90,7 @@ public abstract class TransportWriteAction<
|
|||
if (rerouteWasLocal) {
|
||||
return () -> {};
|
||||
} else {
|
||||
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request), forceExecution);
|
||||
return indexingPressure.markIndexingOperationStarted(primaryOperationSize(request), forceExecution);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -100,7 +100,7 @@ public abstract class TransportWriteAction<
|
|||
|
||||
@Override
|
||||
protected Releasable checkReplicaLimits(ReplicaRequest request) {
|
||||
return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request), forceExecution);
|
||||
return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecution);
|
||||
}
|
||||
|
||||
protected long replicaOperationSize(ReplicaRequest request) {
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.elasticsearch.common.settings;
|
|||
import org.apache.logging.log4j.LogManager;
|
||||
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
|
||||
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
|
||||
import org.elasticsearch.action.bulk.WriteMemoryLimits;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.action.search.TransportSearchAction;
|
||||
import org.elasticsearch.action.support.AutoCreateIndex;
|
||||
import org.elasticsearch.action.support.DestructiveOperations;
|
||||
|
@ -556,7 +556,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
FsHealthService.ENABLED_SETTING,
|
||||
FsHealthService.REFRESH_INTERVAL_SETTING,
|
||||
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
|
||||
WriteMemoryLimits.MAX_INDEXING_BYTES)));
|
||||
IndexingPressure.MAX_INDEXING_BYTES)));
|
||||
|
||||
public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(
|
||||
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER,
|
||||
|
|
|
@ -0,0 +1,111 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.index.stats.IndexingPressureStats;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class IndexingPressure {
|
||||
|
||||
public static final Setting<ByteSizeValue> MAX_INDEXING_BYTES =
|
||||
Setting.memorySizeSetting("indexing_pressure.memory.limit", "10%", Setting.Property.NodeScope);
|
||||
|
||||
private final AtomicLong currentCoordinatingAndPrimaryBytes = new AtomicLong(0);
|
||||
private final AtomicLong currentReplicaBytes = new AtomicLong(0);
|
||||
private final AtomicLong totalCoordinatingAndPrimaryBytes = new AtomicLong(0);
|
||||
private final AtomicLong totalReplicaBytes = new AtomicLong(0);
|
||||
private final AtomicLong coordinatingAndPrimaryRejections = new AtomicLong(0);
|
||||
private final AtomicLong replicaRejections = new AtomicLong(0);
|
||||
|
||||
private final long primaryAndCoordinatingLimits;
|
||||
private final long replicaLimits;
|
||||
|
||||
public IndexingPressure(Settings settings) {
|
||||
this.primaryAndCoordinatingLimits = MAX_INDEXING_BYTES.get(settings).getBytes();
|
||||
this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5);
|
||||
}
|
||||
|
||||
public Releasable markIndexingOperationStarted(long bytes) {
|
||||
return markIndexingOperationStarted(bytes, false);
|
||||
}
|
||||
|
||||
public Releasable markIndexingOperationStarted(long bytes, boolean forceExecution) {
|
||||
long writeBytes = this.currentCoordinatingAndPrimaryBytes.addAndGet(bytes);
|
||||
long replicaWriteBytes = this.currentReplicaBytes.get();
|
||||
long totalBytes = writeBytes + replicaWriteBytes;
|
||||
if (forceExecution == false && totalBytes > primaryAndCoordinatingLimits) {
|
||||
long bytesWithoutOperation = writeBytes - bytes;
|
||||
long totalBytesWithoutOperation = totalBytes - bytes;
|
||||
this.currentCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
|
||||
this.coordinatingAndPrimaryRejections.getAndIncrement();
|
||||
throw new EsRejectedExecutionException("rejected execution of operation [" +
|
||||
"coordinating_and_primary_bytes=" + bytesWithoutOperation + ", " +
|
||||
"replica_bytes=" + replicaWriteBytes + ", " +
|
||||
"all_bytes=" + totalBytesWithoutOperation + ", " +
|
||||
"operation_bytes=" + bytes + ", " +
|
||||
"max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + "]", false);
|
||||
}
|
||||
totalCoordinatingAndPrimaryBytes.getAndAdd(bytes);
|
||||
return () -> this.currentCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
|
||||
}
|
||||
|
||||
public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution) {
|
||||
long replicaWriteBytes = this.currentReplicaBytes.getAndAdd(bytes);
|
||||
if (forceExecution == false && replicaWriteBytes > replicaLimits) {
|
||||
long replicaBytesWithoutOperation = replicaWriteBytes - bytes;
|
||||
this.currentReplicaBytes.getAndAdd(-bytes);
|
||||
this.replicaRejections.getAndIncrement();
|
||||
throw new EsRejectedExecutionException("rejected execution of replica operation [" +
|
||||
"replica_bytes=" + replicaBytesWithoutOperation + ", " +
|
||||
"replica_operation_bytes=" + bytes + ", " +
|
||||
"max_replica_bytes=" + replicaLimits + "]", false);
|
||||
}
|
||||
totalReplicaBytes.getAndAdd(bytes);
|
||||
return () -> this.currentReplicaBytes.getAndAdd(-bytes);
|
||||
}
|
||||
|
||||
public long getCurrentCoordinatingAndPrimaryBytes() {
|
||||
return currentCoordinatingAndPrimaryBytes.get();
|
||||
}
|
||||
|
||||
public long getCurrentReplicaBytes() {
|
||||
return currentReplicaBytes.get();
|
||||
}
|
||||
|
||||
public long getTotalCoordinatingAndPrimaryBytes() {
|
||||
return totalCoordinatingAndPrimaryBytes.get();
|
||||
}
|
||||
|
||||
public long getTotalReplicaBytes() {
|
||||
return totalReplicaBytes.get();
|
||||
}
|
||||
|
||||
public IndexingPressureStats stats() {
|
||||
return new IndexingPressureStats(totalCoordinatingAndPrimaryBytes.get(), totalReplicaBytes.get(),
|
||||
currentCoordinatingAndPrimaryBytes.get(), currentReplicaBytes.get(), coordinatingAndPrimaryRejections.get(),
|
||||
replicaRejections.get());
|
||||
}
|
||||
}
|
|
@ -25,7 +25,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.WriteMemoryLimits;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.action.support.WriteResponse;
|
||||
|
@ -80,7 +80,7 @@ public class RetentionLeaseSyncAction extends
|
|||
final ThreadPool threadPool,
|
||||
final ShardStateAction shardStateAction,
|
||||
final ActionFilters actionFilters,
|
||||
final WriteMemoryLimits writeMemoryLimits) {
|
||||
final IndexingPressure indexingPressure) {
|
||||
super(
|
||||
settings,
|
||||
ACTION_NAME,
|
||||
|
@ -92,7 +92,7 @@ public class RetentionLeaseSyncAction extends
|
|||
actionFilters,
|
||||
RetentionLeaseSyncAction.Request::new,
|
||||
RetentionLeaseSyncAction.Request::new,
|
||||
ThreadPool.Names.MANAGEMENT, false, writeMemoryLimits);
|
||||
ThreadPool.Names.MANAGEMENT, false, indexingPressure);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.stats;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ToXContentFragment;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class IndexingPressureStats implements Writeable, ToXContentFragment {
|
||||
|
||||
private final long totalCoordinatingAndPrimaryBytes;
|
||||
private final long totalReplicaBytes;
|
||||
private final long currentCoordinatingAndPrimaryBytes;
|
||||
private final long currentReplicaBytes;
|
||||
private final long coordinatingAndPrimaryRejections;
|
||||
private final long replicaRejections;
|
||||
|
||||
public IndexingPressureStats(StreamInput in) throws IOException {
|
||||
totalCoordinatingAndPrimaryBytes = in.readVLong();
|
||||
totalReplicaBytes = in.readVLong();
|
||||
currentCoordinatingAndPrimaryBytes = in.readVLong();
|
||||
currentReplicaBytes = in.readVLong();
|
||||
coordinatingAndPrimaryRejections = in.readVLong();
|
||||
replicaRejections = in.readVLong();
|
||||
}
|
||||
|
||||
public IndexingPressureStats(long totalCoordinatingAndPrimaryBytes, long totalReplicaBytes, long currentCoordinatingAndPrimaryBytes,
|
||||
long currentReplicaBytes, long coordinatingAndPrimaryRejections, long replicaRejections) {
|
||||
this.totalCoordinatingAndPrimaryBytes = totalCoordinatingAndPrimaryBytes;
|
||||
this.totalReplicaBytes = totalReplicaBytes;
|
||||
this.currentCoordinatingAndPrimaryBytes = currentCoordinatingAndPrimaryBytes;
|
||||
this.currentReplicaBytes = currentReplicaBytes;
|
||||
this.coordinatingAndPrimaryRejections = coordinatingAndPrimaryRejections;
|
||||
this.replicaRejections = replicaRejections;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVLong(totalCoordinatingAndPrimaryBytes);
|
||||
out.writeVLong(totalReplicaBytes);
|
||||
out.writeVLong(currentCoordinatingAndPrimaryBytes);
|
||||
out.writeVLong(currentReplicaBytes);
|
||||
out.writeVLong(coordinatingAndPrimaryRejections);
|
||||
out.writeVLong(replicaRejections);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject("indexing_pressure");
|
||||
builder.startObject("total");
|
||||
builder.field("coordinating_and_primary_bytes", totalCoordinatingAndPrimaryBytes);
|
||||
builder.field("replica_bytes", totalReplicaBytes);
|
||||
builder.field("all_bytes", totalReplicaBytes + totalCoordinatingAndPrimaryBytes);
|
||||
builder.field("coordinating_and_primary_memory_limit_rejections", coordinatingAndPrimaryRejections);
|
||||
builder.field("replica_memory_limit_rejections", replicaRejections);
|
||||
builder.endObject();
|
||||
builder.startObject("current");
|
||||
builder.field("coordinating_and_primary_bytes", currentCoordinatingAndPrimaryBytes);
|
||||
builder.field("replica_bytes", currentReplicaBytes);
|
||||
builder.field("all_bytes", currentCoordinatingAndPrimaryBytes + currentReplicaBytes);
|
||||
builder.endObject();
|
||||
return builder.endObject();
|
||||
}
|
||||
}
|
|
@ -31,7 +31,7 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.action.ActionModule;
|
||||
import org.elasticsearch.action.ActionType;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus;
|
||||
import org.elasticsearch.action.bulk.WriteMemoryLimits;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
|
||||
import org.elasticsearch.action.search.SearchPhaseController;
|
||||
import org.elasticsearch.action.search.SearchTransportService;
|
||||
|
@ -547,6 +547,7 @@ public class Node implements Closeable {
|
|||
final SearchTransportService searchTransportService = new SearchTransportService(transportService,
|
||||
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
|
||||
final HttpServerTransport httpServerTransport = newHttpTransport(networkModule);
|
||||
final IndexingPressure indexingLimits = new IndexingPressure(settings);
|
||||
|
||||
final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
|
||||
RepositoriesModule repositoriesModule = new RepositoriesModule(this.environment,
|
||||
|
@ -577,7 +578,7 @@ public class Node implements Closeable {
|
|||
this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
|
||||
transportService, indicesService, pluginsService, circuitBreakerService, scriptService,
|
||||
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
|
||||
searchTransportService);
|
||||
searchTransportService, indexingLimits);
|
||||
|
||||
final SearchService searchService = newSearchService(clusterService, indicesService,
|
||||
threadPool, scriptService, bigArrays, searchModule.getFetchPhase(),
|
||||
|
@ -595,7 +596,6 @@ public class Node implements Closeable {
|
|||
new PersistentTasksClusterService(settings, registry, clusterService, threadPool);
|
||||
resourcesToClose.add(persistentTasksClusterService);
|
||||
final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
|
||||
final WriteMemoryLimits bulkIndexingLimits = new WriteMemoryLimits(settings);
|
||||
|
||||
modules.add(b -> {
|
||||
b.bind(Node.class).toInstance(this);
|
||||
|
@ -614,7 +614,7 @@ public class Node implements Closeable {
|
|||
b.bind(ScriptService.class).toInstance(scriptService);
|
||||
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
|
||||
b.bind(IngestService.class).toInstance(ingestService);
|
||||
b.bind(WriteMemoryLimits.class).toInstance(bulkIndexingLimits);
|
||||
b.bind(IndexingPressure.class).toInstance(indexingLimits);
|
||||
b.bind(UsageService.class).toInstance(usageService);
|
||||
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
|
||||
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.node;
|
||||
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.Build;
|
||||
import org.elasticsearch.Version;
|
||||
|
@ -59,6 +60,7 @@ public class NodeService implements Closeable {
|
|||
private final HttpServerTransport httpServerTransport;
|
||||
private final ResponseCollectorService responseCollectorService;
|
||||
private final SearchTransportService searchTransportService;
|
||||
private final IndexingPressure indexingPressure;
|
||||
|
||||
private final Discovery discovery;
|
||||
|
||||
|
@ -67,7 +69,7 @@ public class NodeService implements Closeable {
|
|||
CircuitBreakerService circuitBreakerService, ScriptService scriptService,
|
||||
@Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService,
|
||||
SettingsFilter settingsFilter, ResponseCollectorService responseCollectorService,
|
||||
SearchTransportService searchTransportService) {
|
||||
SearchTransportService searchTransportService, IndexingPressure indexingPressure) {
|
||||
this.settings = settings;
|
||||
this.threadPool = threadPool;
|
||||
this.monitorService = monitorService;
|
||||
|
@ -82,6 +84,7 @@ public class NodeService implements Closeable {
|
|||
this.scriptService = scriptService;
|
||||
this.responseCollectorService = responseCollectorService;
|
||||
this.searchTransportService = searchTransportService;
|
||||
this.indexingPressure = indexingPressure;
|
||||
clusterService.addStateApplier(ingestService);
|
||||
}
|
||||
|
||||
|
@ -103,7 +106,8 @@ public class NodeService implements Closeable {
|
|||
|
||||
public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool,
|
||||
boolean fs, boolean transport, boolean http, boolean circuitBreaker,
|
||||
boolean script, boolean discoveryStats, boolean ingest, boolean adaptiveSelection, boolean scriptCache) {
|
||||
boolean script, boolean discoveryStats, boolean ingest, boolean adaptiveSelection, boolean scriptCache,
|
||||
boolean indexingPressure) {
|
||||
// for indices stats we want to include previous allocated shards stats as well (it will
|
||||
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
|
||||
return new NodeStats(transportService.getLocalNode(), System.currentTimeMillis(),
|
||||
|
@ -120,7 +124,8 @@ public class NodeService implements Closeable {
|
|||
discoveryStats ? discovery.stats() : null,
|
||||
ingest ? ingestService.stats() : null,
|
||||
adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null,
|
||||
scriptCache ? scriptService.cacheStats() : null
|
||||
scriptCache ? scriptService.cacheStats() : null,
|
||||
indexingPressure ? this.indexingPressure.stats() : null
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -518,7 +518,7 @@ public class NodeStatsTests extends ESTestCase {
|
|||
//TODO NodeIndicesStats are not tested here, way too complicated to create, also they need to be migrated to Writeable yet
|
||||
return new NodeStats(node, randomNonNegativeLong(), null, osStats, processStats, jvmStats, threadPoolStats,
|
||||
fsInfo, transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats,
|
||||
ingestStats, adaptiveSelectionStats, scriptCacheStats);
|
||||
ingestStats, adaptiveSelectionStats, scriptCacheStats, null);
|
||||
}
|
||||
|
||||
private IngestStats.Stats getPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String id) {
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
|
|||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
|
@ -121,7 +122,7 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa
|
|||
final ExecutorService direct = EsExecutors.newDirectExecutorService();
|
||||
when(threadPool.executor(anyString())).thenReturn(direct);
|
||||
TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService,
|
||||
null, null, null, mock(ActionFilters.class), null, null, new WriteMemoryLimits(Settings.EMPTY)) {
|
||||
null, null, null, mock(ActionFilters.class), null, null, new IndexingPressure(Settings.EMPTY)) {
|
||||
@Override
|
||||
void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,
|
||||
AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
|
|||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.ingest.IngestService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -142,7 +143,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
|||
new AutoCreateIndex(
|
||||
SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
|
||||
new IndexNameExpressionResolver()
|
||||
), new WriteMemoryLimits(SETTINGS)
|
||||
), new IndexingPressure(SETTINGS)
|
||||
);
|
||||
}
|
||||
@Override
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.ingest.IngestService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
|
@ -81,7 +82,7 @@ public class TransportBulkActionTests extends ESTestCase {
|
|||
super(TransportBulkActionTests.this.threadPool, transportService, clusterService, null, null,
|
||||
null, new ActionFilters(Collections.emptySet()), new Resolver(),
|
||||
new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver()),
|
||||
new WriteMemoryLimits(Settings.EMPTY));
|
||||
new IndexingPressure(Settings.EMPTY));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
|
|||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.rest.action.document.RestBulkAction;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
|
@ -240,7 +241,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
|
|||
actionFilters,
|
||||
indexNameExpressionResolver,
|
||||
autoCreateIndex,
|
||||
new WriteMemoryLimits(Settings.EMPTY),
|
||||
new IndexingPressure(Settings.EMPTY),
|
||||
relativeTimeProvider);
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.elasticsearch.action.resync;
|
|||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.WriteMemoryLimits;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -145,7 +145,7 @@ public class TransportResyncReplicationActionTests extends ESTestCase {
|
|||
|
||||
final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService,
|
||||
clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()),
|
||||
new WriteMemoryLimits(Settings.EMPTY));
|
||||
new IndexingPressure(Settings.EMPTY));
|
||||
|
||||
assertThat(action.globalBlockLevel(), nullValue());
|
||||
assertThat(action.indexBlockLevel(), nullValue());
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.elasticsearch.action.support.replication;
|
|||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.WriteMemoryLimits;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.ActionTestUtils;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
|
@ -367,7 +367,7 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
|
||||
x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null,
|
||||
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false,
|
||||
new WriteMemoryLimits(Settings.EMPTY));
|
||||
new IndexingPressure(Settings.EMPTY));
|
||||
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
|
||||
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
|
||||
}
|
||||
|
@ -377,7 +377,7 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
super(settings, actionName, transportService, clusterService,
|
||||
mockIndicesService(clusterService), threadPool, shardStateAction,
|
||||
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false,
|
||||
new WriteMemoryLimits(settings));
|
||||
new IndexingPressure(settings));
|
||||
this.withDocumentFailureOnPrimary = false;
|
||||
this.withDocumentFailureOnReplica = false;
|
||||
}
|
||||
|
|
|
@ -153,13 +153,13 @@ public class DiskUsageTests extends ESTestCase {
|
|||
List<NodeStats> nodeStats = Arrays.asList(
|
||||
new NodeStats(new DiscoveryNode("node_1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
|
||||
null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null,
|
||||
null),
|
||||
null, null),
|
||||
new NodeStats(new DiscoveryNode("node_2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
|
||||
null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null,
|
||||
null),
|
||||
null, null),
|
||||
new NodeStats(new DiscoveryNode("node_3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
|
||||
null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null,
|
||||
null)
|
||||
null, null)
|
||||
);
|
||||
InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvaiableUsages, newMostAvaiableUsages);
|
||||
DiskUsage leastNode_1 = newLeastAvaiableUsages.get("node_1");
|
||||
|
@ -197,13 +197,13 @@ public class DiskUsageTests extends ESTestCase {
|
|||
List<NodeStats> nodeStats = Arrays.asList(
|
||||
new NodeStats(new DiscoveryNode("node_1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
|
||||
null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null,
|
||||
null),
|
||||
null, null),
|
||||
new NodeStats(new DiscoveryNode("node_2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
|
||||
null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null,
|
||||
null),
|
||||
null, null),
|
||||
new NodeStats(new DiscoveryNode("node_3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
|
||||
null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null,
|
||||
null)
|
||||
null, null)
|
||||
);
|
||||
InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvailableUsages, newMostAvailableUsages);
|
||||
DiskUsage leastNode_1 = newLeastAvailableUsages.get("node_1");
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
package org.elasticsearch.index.seqno;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.WriteMemoryLimits;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.ActionTestUtils;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
|
@ -105,7 +105,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
|
|||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new WriteMemoryLimits(Settings.EMPTY));
|
||||
new IndexingPressure(Settings.EMPTY));
|
||||
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
|
||||
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
|
||||
action.dispatchedShardOperationOnPrimary(request, indexShard,
|
||||
|
@ -142,7 +142,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
|
|||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new WriteMemoryLimits(Settings.EMPTY));
|
||||
new IndexingPressure(Settings.EMPTY));
|
||||
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
|
||||
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
|
||||
|
||||
|
@ -182,7 +182,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
|
|||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new WriteMemoryLimits(Settings.EMPTY));
|
||||
new IndexingPressure(Settings.EMPTY));
|
||||
|
||||
assertNull(action.indexBlockLevel());
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAct
|
|||
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
|
||||
import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction;
|
||||
import org.elasticsearch.action.bulk.BulkAction;
|
||||
import org.elasticsearch.action.bulk.WriteMemoryLimits;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.bulk.TransportBulkAction;
|
||||
|
@ -1483,7 +1483,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
threadPool,
|
||||
shardStateAction,
|
||||
actionFilters,
|
||||
new WriteMemoryLimits(settings))),
|
||||
new IndexingPressure(settings))),
|
||||
new GlobalCheckpointSyncAction(
|
||||
settings,
|
||||
transportService,
|
||||
|
@ -1509,7 +1509,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
mappingUpdatedAction.setClient(client);
|
||||
final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService,
|
||||
clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService),
|
||||
actionFilters, new WriteMemoryLimits(settings));
|
||||
actionFilters, new IndexingPressure(settings));
|
||||
actions.put(BulkAction.INSTANCE,
|
||||
new TransportBulkAction(threadPool, transportService, clusterService,
|
||||
new IngestService(
|
||||
|
@ -1517,7 +1517,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(),
|
||||
Collections.emptyList(), client),
|
||||
transportShardBulkAction, client, actionFilters, indexNameExpressionResolver,
|
||||
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver), new WriteMemoryLimits(settings)
|
||||
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver), new IndexingPressure(settings)
|
||||
));
|
||||
final RestoreService restoreService = new RestoreService(
|
||||
clusterService, repositoriesService, allocationService,
|
||||
|
|
|
@ -84,7 +84,8 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
|
|||
.map(fsInfoPath -> diskUsageFunction.apply(discoveryNode, fsInfoPath))
|
||||
.toArray(FsInfo.Path[]::new)), nodeStats.getTransport(),
|
||||
nodeStats.getHttp(), nodeStats.getBreaker(), nodeStats.getScriptStats(), nodeStats.getDiscoveryStats(),
|
||||
nodeStats.getIngestStats(), nodeStats.getAdaptiveSelectionStats(), nodeStats.getScriptCacheStats());
|
||||
nodeStats.getIngestStats(), nodeStats.getAdaptiveSelectionStats(), nodeStats.getScriptCacheStats(),
|
||||
nodeStats.getIndexingPressureStats());
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExc
|
|||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
|
||||
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
|
||||
import org.elasticsearch.action.bulk.WriteMemoryLimits;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.action.support.replication.TransportReplicationAction;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
|
@ -1350,13 +1350,13 @@ public final class InternalTestCluster extends TestCluster {
|
|||
private void assertAllPendingWriteLimitsReleased() throws Exception {
|
||||
assertBusy(() -> {
|
||||
for (NodeAndClient nodeAndClient : nodes.values()) {
|
||||
WriteMemoryLimits writeMemoryLimits = getInstance(WriteMemoryLimits.class, nodeAndClient.name);
|
||||
final long writeBytes = writeMemoryLimits.getWriteBytes();
|
||||
IndexingPressure indexingPressure = getInstance(IndexingPressure.class, nodeAndClient.name);
|
||||
final long writeBytes = indexingPressure.getCurrentCoordinatingAndPrimaryBytes();
|
||||
if (writeBytes > 0) {
|
||||
throw new AssertionError("pending write bytes [" + writeBytes + "] bytes on node ["
|
||||
+ nodeAndClient.name + "].");
|
||||
}
|
||||
final long replicaWriteBytes = writeMemoryLimits.getReplicaWriteBytes();
|
||||
final long replicaWriteBytes = indexingPressure.getCurrentReplicaBytes();
|
||||
if (replicaWriteBytes > 0) {
|
||||
throw new AssertionError("pending replica write bytes [" + writeBytes + "] bytes on node ["
|
||||
+ nodeAndClient.name + "].");
|
||||
|
@ -2497,7 +2497,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
NodeService nodeService = getInstanceFromNode(NodeService.class, nodeAndClient.node);
|
||||
CommonStatsFlags flags = new CommonStatsFlags(Flag.FieldData, Flag.QueryCache, Flag.Segments);
|
||||
NodeStats stats = nodeService.stats(flags,
|
||||
false, false, false, false, false, false, false, false, false, false, false, false, false);
|
||||
false, false, false, false, false, false, false, false, false, false, false, false, false, false);
|
||||
assertThat("Fielddata size must be 0 on node: " + stats.getNode(),
|
||||
stats.getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L));
|
||||
assertThat("Query cache size must be 0 on node: " + stats.getNode(),
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
|
||||
package org.elasticsearch.xpack.ccr;
|
||||
|
||||
import org.elasticsearch.action.bulk.WriteMemoryLimits;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -136,12 +136,12 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
|
|||
final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower");
|
||||
client().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||
|
||||
WriteMemoryLimits memoryLimits = getInstanceFromNode(WriteMemoryLimits.class);
|
||||
IndexingPressure memoryLimits = getInstanceFromNode(IndexingPressure.class);
|
||||
final long finalSourceSize = sourceSize;
|
||||
assertBusy(() -> {
|
||||
// The actual write bytes will be greater due to other request fields. However, this test is
|
||||
// just spot checking that the bytes are incremented at all.
|
||||
assertTrue(memoryLimits.getWriteBytes() > finalSourceSize);
|
||||
assertTrue(memoryLimits.getCurrentCoordinatingAndPrimaryBytes() > finalSourceSize);
|
||||
});
|
||||
blocker.countDown();
|
||||
assertBusy(() -> {
|
||||
|
|
|
@ -9,7 +9,7 @@ package org.elasticsearch.xpack.ccr.action.bulk;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.WriteMemoryLimits;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
|
@ -38,7 +38,7 @@ import java.util.List;
|
|||
public class TransportBulkShardOperationsAction
|
||||
extends TransportWriteAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {
|
||||
|
||||
private final WriteMemoryLimits writeMemoryLimits;
|
||||
private final IndexingPressure indexingPressure;
|
||||
|
||||
@Inject
|
||||
public TransportBulkShardOperationsAction(
|
||||
|
@ -49,7 +49,7 @@ public class TransportBulkShardOperationsAction
|
|||
final ThreadPool threadPool,
|
||||
final ShardStateAction shardStateAction,
|
||||
final ActionFilters actionFilters,
|
||||
final WriteMemoryLimits writeMemoryLimits) {
|
||||
final IndexingPressure indexingPressure) {
|
||||
super(
|
||||
settings,
|
||||
BulkShardOperationsAction.NAME,
|
||||
|
@ -61,14 +61,14 @@ public class TransportBulkShardOperationsAction
|
|||
actionFilters,
|
||||
BulkShardOperationsRequest::new,
|
||||
BulkShardOperationsRequest::new,
|
||||
ThreadPool.Names.WRITE, false, writeMemoryLimits);
|
||||
this.writeMemoryLimits = writeMemoryLimits;
|
||||
ThreadPool.Names.WRITE, false, indexingPressure);
|
||||
this.indexingPressure = indexingPressure;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, BulkShardOperationsRequest request, ActionListener<BulkShardOperationsResponse> listener) {
|
||||
// This is executed on the follower coordinator node and we need to mark the bytes.
|
||||
Releasable releasable = writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request));
|
||||
Releasable releasable = indexingPressure.markIndexingOperationStarted(primaryOperationSize(request));
|
||||
ActionListener<BulkShardOperationsResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
|
||||
try {
|
||||
super.doExecute(task, request, releasingListener);
|
||||
|
|
|
@ -641,7 +641,7 @@ public class MachineLearningFeatureSetTests extends ESTestCase {
|
|||
IntStream.range(0, pipelineNames.size()).boxed().collect(Collectors.toMap(pipelineNames::get, processorStats::get)));
|
||||
return new NodeStats(mock(DiscoveryNode.class),
|
||||
Instant.now().toEpochMilli(), null, null, null, null, null, null, null, null,
|
||||
null, null, null, ingestStats, null, null);
|
||||
null, null, null, ingestStats, null, null, null);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -300,7 +300,7 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase {
|
|||
IntStream.range(0, pipelineids.size()).boxed().collect(Collectors.toMap(pipelineids::get, processorStats::get)));
|
||||
return new NodeStats(mock(DiscoveryNode.class),
|
||||
Instant.now().toEpochMilli(), null, null, null, null, null, null, null, null,
|
||||
null, null, null, ingestStats, null, null);
|
||||
null, null, null, ingestStats, null, null, null);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -366,6 +366,7 @@ public class NodeStatsMonitoringDocTests extends BaseFilteredMonitoringDocTestCa
|
|||
emptySet(),
|
||||
Version.V_6_0_0_beta1);
|
||||
|
||||
return new NodeStats(discoveryNode, no, indices, os, process, jvm, threadPool, fs, null, null, null, null, null, null, null, null);
|
||||
return new NodeStats(discoveryNode, no, indices, os, process, jvm, threadPool, fs, null, null, null, null, null, null, null, null,
|
||||
null);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue