Separate coordinating and primary bytes in stats (#59487)

Currently we combine coordinating and primary bytes into a single bucket
for indexing pressure stats. This makes sense for rejection logic.
However, for metrics it would be useful to separate them.
This commit is contained in:
Tim Brooks 2020-07-14 12:22:42 -06:00
parent 70fe553ce0
commit 408a07f96a
No known key found for this signature in database
GPG Key ID: C2AA3BB91A889E77
11 changed files with 280 additions and 114 deletions

View File

@ -76,32 +76,44 @@ public class IndexingPressureRestIT extends HttpSmokeTestCase {
ArrayList<Object> values = new ArrayList<>(((Map<Object, Object>) nodeStatsMap.get("nodes")).values());
assertThat(values.size(), equalTo(2));
XContentTestUtils.JsonMapView node1 = new XContentTestUtils.JsonMapView((Map<String, Object>) values.get(0));
Integer node1IndexingBytes = node1.get("indexing_pressure.total.coordinating_and_primary_bytes");
Integer node1CombinedBytes = node1.get("indexing_pressure.total.combined_coordinating_and_primary_bytes");
Integer node1PrimaryBytes = node1.get("indexing_pressure.total.primary_bytes");
Integer node1ReplicaBytes = node1.get("indexing_pressure.total.replica_bytes");
Integer node1Rejections = node1.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");
Integer node1CoordinatingRejections = node1.get("indexing_pressure.total.coordinating_rejections");
Integer node1PrimaryRejections = node1.get("indexing_pressure.total.primary_rejections");
XContentTestUtils.JsonMapView node2 = new XContentTestUtils.JsonMapView((Map<String, Object>) values.get(1));
Integer node2IndexingBytes = node2.get("indexing_pressure.total.coordinating_and_primary_bytes");
Integer node2IndexingBytes = node2.get("indexing_pressure.total.combined_coordinating_and_primary_bytes");
Integer node2PrimaryBytes = node2.get("indexing_pressure.total.primary_bytes");
Integer node2ReplicaBytes = node2.get("indexing_pressure.total.replica_bytes");
Integer node2Rejections = node2.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");
Integer node2CoordinatingRejections = node2.get("indexing_pressure.total.coordinating_rejections");
Integer node2PrimaryRejections = node2.get("indexing_pressure.total.primary_rejections");
if (node1IndexingBytes == 0) {
if (node1CombinedBytes == 0) {
assertThat(node2IndexingBytes, greaterThan(0));
assertThat(node2IndexingBytes, lessThan(1024));
} else {
assertThat(node1IndexingBytes, greaterThan(0));
assertThat(node1IndexingBytes, lessThan(1024));
assertThat(node1CombinedBytes, greaterThan(0));
assertThat(node1CombinedBytes, lessThan(1024));
}
if (node1ReplicaBytes == 0) {
assertThat(node1PrimaryBytes, greaterThan(0));
assertThat(node1PrimaryBytes, lessThan(1024));
assertThat(node2ReplicaBytes, greaterThan(0));
assertThat(node2ReplicaBytes, lessThan(1024));
} else {
assertThat(node2PrimaryBytes, greaterThan(0));
assertThat(node2PrimaryBytes, lessThan(1024));
assertThat(node2ReplicaBytes, equalTo(0));
assertThat(node1ReplicaBytes, lessThan(1024));
}
assertThat(node1Rejections, equalTo(0));
assertThat(node2Rejections, equalTo(0));
assertThat(node1CoordinatingRejections, equalTo(0));
assertThat(node1PrimaryRejections, equalTo(0));
assertThat(node2CoordinatingRejections, equalTo(0));
assertThat(node2PrimaryRejections, equalTo(0));
Request failedIndexingRequest = new Request("POST", "/index_name/_doc/");
String largeString = randomAlphaOfLength(10000);
@ -116,14 +128,19 @@ public class IndexingPressureRestIT extends HttpSmokeTestCase {
ArrayList<Object> values2 = new ArrayList<>(((Map<Object, Object>) nodeStatsMap2.get("nodes")).values());
assertThat(values2.size(), equalTo(2));
XContentTestUtils.JsonMapView node1AfterRejection = new XContentTestUtils.JsonMapView((Map<String, Object>) values2.get(0));
node1Rejections = node1AfterRejection.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");
node1CoordinatingRejections = node1AfterRejection.get("indexing_pressure.total.coordinating_rejections");
node1PrimaryRejections = node1.get("indexing_pressure.total.primary_rejections");
XContentTestUtils.JsonMapView node2AfterRejection = new XContentTestUtils.JsonMapView((Map<String, Object>) values2.get(1));
node2Rejections = node2AfterRejection.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");
node2CoordinatingRejections = node2AfterRejection.get("indexing_pressure.total.coordinating_rejections");
node2PrimaryRejections = node2AfterRejection.get("indexing_pressure.total.primary_rejections");
if (node1Rejections == 0) {
assertThat(node2Rejections, equalTo(1));
if (node1CoordinatingRejections == 0) {
assertThat(node2CoordinatingRejections, equalTo(1));
} else {
assertThat(node1Rejections, equalTo(1));
assertThat(node1CoordinatingRejections, equalTo(1));
}
assertThat(node1PrimaryRejections, equalTo(0));
assertThat(node2PrimaryRejections, equalTo(0));
}
}

View File

@ -14,12 +14,19 @@
nodes.stats:
metric: [ indexing_pressure ]
- gte: { nodes.$node_id.indexing_pressure.total.coordinating_and_primary_bytes: 0 }
- gte: { nodes.$node_id.indexing_pressure.total.combined_coordinating_and_primary_bytes: 0 }
- gte: { nodes.$node_id.indexing_pressure.total.coordinating_bytes: 0 }
- gte: { nodes.$node_id.indexing_pressure.total.primary_bytes: 0 }
- gte: { nodes.$node_id.indexing_pressure.total.replica_bytes: 0 }
- gte: { nodes.$node_id.indexing_pressure.total.all_bytes: 0 }
- gte: { nodes.$node_id.indexing_pressure.total.coordinating_and_primary_memory_limit_rejections: 0 }
- gte: { nodes.$node_id.indexing_pressure.total.replica_memory_limit_rejections: 0 }
- gte: { nodes.$node_id.indexing_pressure.current.coordinating_and_primary_bytes: 0 }
- gte: { nodes.$node_id.indexing_pressure.total.coordinating_rejections: 0 }
- gte: { nodes.$node_id.indexing_pressure.total.primary_rejections: 0 }
- gte: { nodes.$node_id.indexing_pressure.total.replica_rejections: 0 }
- gte: { nodes.$node_id.indexing_pressure.current.combined_coordinating_and_primary_bytes: 0 }
- gte: { nodes.$node_id.indexing_pressure.current.coordinating_bytes: 0 }
- gte: { nodes.$node_id.indexing_pressure.current.primary_bytes: 0 }
- gte: { nodes.$node_id.indexing_pressure.current.replica_bytes: 0 }
- gte: { nodes.$node_id.indexing_pressure.current.all_bytes: 0 }

View File

@ -56,8 +56,6 @@ import static org.hamcrest.Matchers.instanceOf;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1, transportClientRatio = 0.0D)
public class IndexingPressureIT extends ESIntegTestCase {
// TODO: Add additional REST tests when metrics are exposed
public static final String INDEX_NAME = "test";
private static final Settings unboundedWriteQueue = Settings.builder().put("thread_pool.write.queue_size", -1).build();
@ -140,11 +138,19 @@ public class IndexingPressureIT extends ESIntegTestCase {
IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName);
IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode);
assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
assertThat(primaryWriteLimits.getCurrentPrimaryBytes(), greaterThan(bulkShardRequestSize));
assertEquals(0, primaryWriteLimits.getCurrentCoordinatingBytes());
assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingBytes());
assertEquals(0, replicaWriteLimits.getCurrentPrimaryBytes());
assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes());
assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingBytes());
assertEquals(0, coordinatingWriteLimits.getCurrentPrimaryBytes());
assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
latchBlockingReplicationSend.countDown();
@ -167,14 +173,25 @@ public class IndexingPressureIT extends ESIntegTestCase {
final long secondBulkShardRequestSize = request.ramBytesUsed();
if (usePrimaryAsCoordinatingNode) {
assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(),
greaterThan(bulkShardRequestSize + secondBulkRequestSize));
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
assertBusy(() -> {
assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(),
greaterThan(bulkShardRequestSize + secondBulkRequestSize));
assertEquals(secondBulkRequestSize, primaryWriteLimits.getCurrentCoordinatingBytes());
assertThat(primaryWriteLimits.getCurrentPrimaryBytes(),
greaterThan(bulkShardRequestSize + secondBulkRequestSize));
assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingBytes());
assertEquals(0, replicaWriteLimits.getCurrentPrimaryBytes());
});
} else {
assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
assertEquals(secondBulkRequestSize, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
assertEquals(secondBulkRequestSize, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(secondBulkRequestSize, replicaWriteLimits.getCurrentCoordinatingBytes());
assertEquals(0, replicaWriteLimits.getCurrentPrimaryBytes());
}
assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertBusy(() -> assertThat(replicaWriteLimits.getCurrentReplicaBytes(),
greaterThan(bulkShardRequestSize + secondBulkShardRequestSize)));
@ -183,11 +200,19 @@ public class IndexingPressureIT extends ESIntegTestCase {
successFuture.actionGet();
secondFuture.actionGet();
assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
assertEquals(0, primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(0, primaryWriteLimits.getCurrentCoordinatingBytes());
assertEquals(0, primaryWriteLimits.getCurrentPrimaryBytes());
assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingBytes());
assertEquals(0, replicaWriteLimits.getCurrentPrimaryBytes());
assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes());
assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
assertEquals(0, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingBytes());
assertEquals(0, coordinatingWriteLimits.getCurrentPrimaryBytes());
assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
} finally {
if (replicationSendPointReached.getCount() > 0) {
@ -237,11 +262,11 @@ public class IndexingPressureIT extends ESIntegTestCase {
IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode);
assertBusy(() -> {
assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize));
assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
});
@ -259,11 +284,11 @@ public class IndexingPressureIT extends ESIntegTestCase {
successFuture.actionGet();
assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
assertEquals(0, primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes());
assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
assertEquals(0, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
}
}
@ -301,11 +326,11 @@ public class IndexingPressureIT extends ESIntegTestCase {
IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode);
assertBusy(() -> {
assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize));
assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
assertEquals(0, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
});
@ -317,11 +342,11 @@ public class IndexingPressureIT extends ESIntegTestCase {
successFuture.actionGet();
assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
assertEquals(0, primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes());
assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
assertEquals(0, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
}
}

View File

@ -167,7 +167,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
long indexingBytes = bulkRequest.ramBytesUsed();
final Releasable releasable = indexingPressure.markIndexingOperationStarted(indexingBytes);
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
try {
doInternalExecute(task, bulkRequest, releasingListener);

View File

@ -180,7 +180,15 @@ public abstract class TransportReplicationAction<
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
assert request.shardId() != null : "request shardId must be set";
new ReroutePhase((ReplicationTask) task, request, listener).run();
runReroutePhase(task, request, listener, true);
}
private void runReroutePhase(Task task, Request request, ActionListener<Response> listener, boolean initiatedByNodeClient) {
try {
new ReroutePhase((ReplicationTask) task, request, listener, initiatedByNodeClient).run();
} catch (RuntimeException e) {
listener.onFailure(e);
}
}
protected ReplicationOperation.Replicas<ReplicaRequest> newReplicasProxy() {
@ -278,7 +286,7 @@ public abstract class TransportReplicationAction<
Releasable releasable = checkOperationLimits(request);
ActionListener<Response> listener =
ActionListener.runBefore(new ChannelActionListener<>(channel, actionName, request), releasable::close);
execute(task, request, listener);
runReroutePhase(task, request, listener, false);
}
protected Releasable checkOperationLimits(final Request request) {
@ -286,7 +294,8 @@ public abstract class TransportReplicationAction<
}
protected void handlePrimaryRequest(final ConcreteShardRequest<Request> request, final TransportChannel channel, final Task task) {
Releasable releasable = checkPrimaryLimits(request.getRequest(), request.sentFromLocalReroute());
Releasable releasable = checkPrimaryLimits(request.getRequest(), request.sentFromLocalReroute(),
request.localRerouteInitiatedByNodeClient());
ActionListener<Response> listener =
ActionListener.runBefore(new ChannelActionListener<>(channel, transportPrimaryAction, request), releasable::close);
@ -297,7 +306,7 @@ public abstract class TransportReplicationAction<
}
}
protected Releasable checkPrimaryLimits(final Request request, boolean rerouteWasLocal) {
protected Releasable checkPrimaryLimits(final Request request, boolean rerouteWasLocal, boolean localRerouteInitiatedByNodeClient) {
return () -> {};
}
@ -659,12 +668,18 @@ public abstract class TransportReplicationAction<
final class ReroutePhase extends AbstractRunnable {
private final ActionListener<Response> listener;
private final Request request;
private final boolean initiatedByNodeClient;
private final ReplicationTask task;
private final ClusterStateObserver observer;
private final AtomicBoolean finished = new AtomicBoolean();
ReroutePhase(ReplicationTask task, Request request, ActionListener<Response> listener) {
this(task, request, listener, false);
}
ReroutePhase(ReplicationTask task, Request request, ActionListener<Response> listener, boolean initiatedByNodeClient) {
this.request = request;
this.initiatedByNodeClient = initiatedByNodeClient;
if (task != null) {
this.request.setParentTask(clusterService.localNode().getId(), task.getId());
}
@ -750,7 +765,8 @@ public abstract class TransportReplicationAction<
transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId());
}
performAction(node, transportPrimaryAction, true,
new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id()), true));
new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id()), true,
initiatedByNodeClient));
}
private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) {
@ -1104,25 +1120,31 @@ public abstract class TransportReplicationAction<
private final R request;
// Indicates if this primary shard request originated by a reroute on this local node.
private final boolean sentFromLocalReroute;
// Indicates if this local reroute was initiated by the NodeClient executing a transport action. This
// is only true if sentFromLocalReroute is true.
private final boolean localRerouteInitiatedByNodeClient;
public ConcreteShardRequest(Writeable.Reader<R> requestReader, StreamInput in) throws IOException {
targetAllocationID = in.readString();
primaryTerm = in.readVLong();
sentFromLocalReroute = false;
localRerouteInitiatedByNodeClient = false;
request = requestReader.read(in);
}
public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm) {
this(request, targetAllocationID, primaryTerm, false);
this(request, targetAllocationID, primaryTerm, false, false);
}
public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm, boolean sentFromLocalReroute) {
public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm, boolean sentFromLocalReroute,
boolean localRerouteInitiatedByNodeClient) {
Objects.requireNonNull(request);
Objects.requireNonNull(targetAllocationID);
this.request = request;
this.targetAllocationID = targetAllocationID;
this.primaryTerm = primaryTerm;
this.sentFromLocalReroute = sentFromLocalReroute;
this.localRerouteInitiatedByNodeClient = localRerouteInitiatedByNodeClient;
}
@Override
@ -1155,6 +1177,7 @@ public abstract class TransportReplicationAction<
// the local transport. It should never be serialized to be sent over the wire. If it is sent over
// the wire, then it was NOT sent from a local reroute.
assert sentFromLocalReroute == false;
assert localRerouteInitiatedByNodeClient == false;
out.writeString(targetAllocationID);
out.writeVLong(primaryTerm);
request.writeTo(out);
@ -1164,6 +1187,10 @@ public abstract class TransportReplicationAction<
return sentFromLocalReroute;
}
public boolean localRerouteInitiatedByNodeClient() {
return localRerouteInitiatedByNodeClient;
}
public R getRequest() {
return request;
}

View File

@ -80,17 +80,24 @@ public abstract class TransportWriteAction<
@Override
protected Releasable checkOperationLimits(Request request) {
return indexingPressure.markIndexingOperationStarted(primaryOperationSize(request), forceExecution);
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecution);
}
@Override
protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal) {
// If this primary request was submitted by a reroute performed on this local node, we have already
// accounted the bytes.
protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal, boolean localRerouteInitiatedByNodeClient) {
if (rerouteWasLocal) {
return () -> {};
// If this primary request was received from a local reroute initiated by the node client, we
// must mark a new primary operation local to the coordinating node.
if (localRerouteInitiatedByNodeClient) {
return indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(primaryOperationSize(request));
} else {
return () -> {};
}
} else {
return indexingPressure.markIndexingOperationStarted(primaryOperationSize(request), forceExecution);
// If this primary request was received directly from the network, we must mark a new primary
// operation. This happens if the write action skips the reroute step (ex: rsync) or during
// primary delegation, after the primary relocation hand-off.
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecution);
}
}

View File

@ -33,11 +33,18 @@ public class IndexingPressure {
public static final Setting<ByteSizeValue> MAX_INDEXING_BYTES =
Setting.memorySizeSetting("indexing_pressure.memory.limit", "10%", Setting.Property.NodeScope);
private final AtomicLong currentCoordinatingAndPrimaryBytes = new AtomicLong(0);
private final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
private final AtomicLong currentCoordinatingBytes = new AtomicLong(0);
private final AtomicLong currentPrimaryBytes = new AtomicLong(0);
private final AtomicLong currentReplicaBytes = new AtomicLong(0);
private final AtomicLong totalCoordinatingAndPrimaryBytes = new AtomicLong(0);
private final AtomicLong totalCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
private final AtomicLong totalCoordinatingBytes = new AtomicLong(0);
private final AtomicLong totalPrimaryBytes = new AtomicLong(0);
private final AtomicLong totalReplicaBytes = new AtomicLong(0);
private final AtomicLong coordinatingAndPrimaryRejections = new AtomicLong(0);
private final AtomicLong coordinatingRejections = new AtomicLong(0);
private final AtomicLong primaryRejections = new AtomicLong(0);
private final AtomicLong replicaRejections = new AtomicLong(0);
private final long primaryAndCoordinatingLimits;
@ -48,28 +55,60 @@ public class IndexingPressure {
this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5);
}
public Releasable markIndexingOperationStarted(long bytes) {
return markIndexingOperationStarted(bytes, false);
}
public Releasable markIndexingOperationStarted(long bytes, boolean forceExecution) {
long writeBytes = this.currentCoordinatingAndPrimaryBytes.addAndGet(bytes);
public Releasable markCoordinatingOperationStarted(long bytes) {
long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
long replicaWriteBytes = this.currentReplicaBytes.get();
long totalBytes = writeBytes + replicaWriteBytes;
if (forceExecution == false && totalBytes > primaryAndCoordinatingLimits) {
long bytesWithoutOperation = writeBytes - bytes;
long totalBytes = combinedBytes + replicaWriteBytes;
if (totalBytes > primaryAndCoordinatingLimits) {
long bytesWithoutOperation = combinedBytes - bytes;
long totalBytesWithoutOperation = totalBytes - bytes;
this.currentCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
this.coordinatingAndPrimaryRejections.getAndIncrement();
throw new EsRejectedExecutionException("rejected execution of operation [" +
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
this.coordinatingRejections.getAndIncrement();
throw new EsRejectedExecutionException("rejected execution of coordinating operation [" +
"coordinating_and_primary_bytes=" + bytesWithoutOperation + ", " +
"replica_bytes=" + replicaWriteBytes + ", " +
"all_bytes=" + totalBytesWithoutOperation + ", " +
"operation_bytes=" + bytes + ", " +
"max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + "]", false);
}
totalCoordinatingAndPrimaryBytes.getAndAdd(bytes);
return () -> this.currentCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
currentCoordinatingBytes.getAndAdd(bytes);
totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
totalCoordinatingBytes.getAndAdd(bytes);
return () -> {
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
this.currentCoordinatingBytes.getAndAdd(-bytes);
};
}
public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(long bytes) {
currentPrimaryBytes.getAndAdd(bytes);
totalPrimaryBytes.getAndAdd(bytes);
return () -> this.currentPrimaryBytes.getAndAdd(-bytes);
}
public Releasable markPrimaryOperationStarted(long bytes, boolean forceExecution) {
long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
long replicaWriteBytes = this.currentReplicaBytes.get();
long totalBytes = combinedBytes + replicaWriteBytes;
if (forceExecution == false && totalBytes > primaryAndCoordinatingLimits) {
long bytesWithoutOperation = combinedBytes - bytes;
long totalBytesWithoutOperation = totalBytes - bytes;
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
this.primaryRejections.getAndIncrement();
throw new EsRejectedExecutionException("rejected execution of primary operation [" +
"coordinating_and_primary_bytes=" + bytesWithoutOperation + ", " +
"replica_bytes=" + replicaWriteBytes + ", " +
"all_bytes=" + totalBytesWithoutOperation + ", " +
"operation_bytes=" + bytes + ", " +
"max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + "]", false);
}
currentPrimaryBytes.getAndAdd(bytes);
totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
totalPrimaryBytes.getAndAdd(bytes);
return () -> {
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
this.currentPrimaryBytes.getAndAdd(-bytes);
};
}
public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution) {
@ -87,25 +126,26 @@ public class IndexingPressure {
return () -> this.currentReplicaBytes.getAndAdd(-bytes);
}
public long getCurrentCoordinatingAndPrimaryBytes() {
return currentCoordinatingAndPrimaryBytes.get();
public long getCurrentCombinedCoordinatingAndPrimaryBytes() {
return currentCombinedCoordinatingAndPrimaryBytes.get();
}
public long getCurrentCoordinatingBytes() {
return currentCoordinatingBytes.get();
}
public long getCurrentPrimaryBytes() {
return currentPrimaryBytes.get();
}
public long getCurrentReplicaBytes() {
return currentReplicaBytes.get();
}
public long getTotalCoordinatingAndPrimaryBytes() {
return totalCoordinatingAndPrimaryBytes.get();
}
public long getTotalReplicaBytes() {
return totalReplicaBytes.get();
}
public IndexingPressureStats stats() {
return new IndexingPressureStats(totalCoordinatingAndPrimaryBytes.get(), totalReplicaBytes.get(),
currentCoordinatingAndPrimaryBytes.get(), currentReplicaBytes.get(), coordinatingAndPrimaryRejections.get(),
replicaRejections.get());
return new IndexingPressureStats(totalCombinedCoordinatingAndPrimaryBytes.get(), totalCoordinatingBytes.get(),
totalPrimaryBytes.get(), totalReplicaBytes.get(), currentCombinedCoordinatingAndPrimaryBytes.get(),
currentCoordinatingBytes.get(), currentPrimaryBytes.get(), currentReplicaBytes.get(), coordinatingRejections.get(),
primaryRejections.get(), replicaRejections.get());
}
}

View File

@ -29,39 +29,67 @@ import java.io.IOException;
public class IndexingPressureStats implements Writeable, ToXContentFragment {
private final long totalCoordinatingAndPrimaryBytes;
private final long totalCombinedCoordinatingAndPrimaryBytes;
private final long totalCoordinatingBytes;
private final long totalPrimaryBytes;
private final long totalReplicaBytes;
private final long currentCoordinatingAndPrimaryBytes;
private final long currentCombinedCoordinatingAndPrimaryBytes;
private final long currentCoordinatingBytes;
private final long currentPrimaryBytes;
private final long currentReplicaBytes;
private final long coordinatingAndPrimaryRejections;
private final long coordinatingRejections;
private final long primaryRejections;
private final long replicaRejections;
public IndexingPressureStats(StreamInput in) throws IOException {
totalCoordinatingAndPrimaryBytes = in.readVLong();
totalCombinedCoordinatingAndPrimaryBytes = in.readVLong();
totalCoordinatingBytes = in.readVLong();
totalPrimaryBytes = in.readVLong();
totalReplicaBytes = in.readVLong();
currentCoordinatingAndPrimaryBytes = in.readVLong();
currentCombinedCoordinatingAndPrimaryBytes = in.readVLong();
currentCoordinatingBytes = in.readVLong();
currentPrimaryBytes = in.readVLong();
currentReplicaBytes = in.readVLong();
coordinatingAndPrimaryRejections = in.readVLong();
coordinatingRejections = in.readVLong();
primaryRejections = in.readVLong();
replicaRejections = in.readVLong();
}
public IndexingPressureStats(long totalCoordinatingAndPrimaryBytes, long totalReplicaBytes, long currentCoordinatingAndPrimaryBytes,
long currentReplicaBytes, long coordinatingAndPrimaryRejections, long replicaRejections) {
this.totalCoordinatingAndPrimaryBytes = totalCoordinatingAndPrimaryBytes;
public IndexingPressureStats(long totalCombinedCoordinatingAndPrimaryBytes, long totalCoordinatingBytes, long totalPrimaryBytes,
long totalReplicaBytes, long currentCombinedCoordinatingAndPrimaryBytes, long currentCoordinatingBytes,
long currentPrimaryBytes, long currentReplicaBytes, long coordinatingRejections, long primaryRejections,
long replicaRejections) {
this.totalCombinedCoordinatingAndPrimaryBytes = totalCombinedCoordinatingAndPrimaryBytes;
this.totalCoordinatingBytes = totalCoordinatingBytes;
this.totalPrimaryBytes = totalPrimaryBytes;
this.totalReplicaBytes = totalReplicaBytes;
this.currentCoordinatingAndPrimaryBytes = currentCoordinatingAndPrimaryBytes;
this.currentCombinedCoordinatingAndPrimaryBytes = currentCombinedCoordinatingAndPrimaryBytes;
this.currentCoordinatingBytes = currentCoordinatingBytes;
this.currentPrimaryBytes = currentPrimaryBytes;
this.currentReplicaBytes = currentReplicaBytes;
this.coordinatingAndPrimaryRejections = coordinatingAndPrimaryRejections;
this.coordinatingRejections = coordinatingRejections;
this.primaryRejections = primaryRejections;
this.replicaRejections = replicaRejections;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(totalCoordinatingAndPrimaryBytes);
out.writeVLong(totalCombinedCoordinatingAndPrimaryBytes);
out.writeVLong(totalCoordinatingBytes);
out.writeVLong(totalPrimaryBytes);
out.writeVLong(totalReplicaBytes);
out.writeVLong(currentCoordinatingAndPrimaryBytes);
out.writeVLong(currentCombinedCoordinatingAndPrimaryBytes);
out.writeVLong(currentCoordinatingBytes);
out.writeVLong(currentPrimaryBytes);
out.writeVLong(currentReplicaBytes);
out.writeVLong(coordinatingAndPrimaryRejections);
out.writeVLong(coordinatingRejections);
out.writeVLong(primaryRejections);
out.writeVLong(replicaRejections);
}
@ -69,16 +97,21 @@ public class IndexingPressureStats implements Writeable, ToXContentFragment {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("indexing_pressure");
builder.startObject("total");
builder.field("coordinating_and_primary_bytes", totalCoordinatingAndPrimaryBytes);
builder.field("combined_coordinating_and_primary_bytes", totalCombinedCoordinatingAndPrimaryBytes);
builder.field("coordinating_bytes", totalCoordinatingBytes);
builder.field("primary_bytes", totalPrimaryBytes);
builder.field("replica_bytes", totalReplicaBytes);
builder.field("all_bytes", totalReplicaBytes + totalCoordinatingAndPrimaryBytes);
builder.field("coordinating_and_primary_memory_limit_rejections", coordinatingAndPrimaryRejections);
builder.field("replica_memory_limit_rejections", replicaRejections);
builder.field("all_bytes", totalReplicaBytes + totalCombinedCoordinatingAndPrimaryBytes);
builder.field("coordinating_rejections", coordinatingRejections);
builder.field("primary_rejections", primaryRejections);
builder.field("replica_rejections", replicaRejections);
builder.endObject();
builder.startObject("current");
builder.field("coordinating_and_primary_bytes", currentCoordinatingAndPrimaryBytes);
builder.field("combined_coordinating_and_primary_bytes", currentCombinedCoordinatingAndPrimaryBytes);
builder.field("coordinating_bytes", currentCoordinatingBytes);
builder.field("primary_bytes", currentPrimaryBytes);
builder.field("replica_bytes", currentReplicaBytes);
builder.field("all_bytes", currentCoordinatingAndPrimaryBytes + currentReplicaBytes);
builder.field("all_bytes", currentCombinedCoordinatingAndPrimaryBytes + currentReplicaBytes);
builder.endObject();
return builder.endObject();
}

View File

@ -1351,14 +1351,24 @@ public final class InternalTestCluster extends TestCluster {
assertBusy(() -> {
for (NodeAndClient nodeAndClient : nodes.values()) {
IndexingPressure indexingPressure = getInstance(IndexingPressure.class, nodeAndClient.name);
final long writeBytes = indexingPressure.getCurrentCoordinatingAndPrimaryBytes();
if (writeBytes > 0) {
throw new AssertionError("pending write bytes [" + writeBytes + "] bytes on node ["
final long combinedBytes = indexingPressure.getCurrentCombinedCoordinatingAndPrimaryBytes();
if (combinedBytes > 0) {
throw new AssertionError("pending combined bytes [" + combinedBytes + "] bytes on node ["
+ nodeAndClient.name + "].");
}
final long coordinatingBytes = indexingPressure.getCurrentCoordinatingBytes();
if (coordinatingBytes > 0) {
throw new AssertionError("pending coordinating bytes [" + coordinatingBytes + "] bytes on node ["
+ nodeAndClient.name + "].");
}
final long primaryBytes = indexingPressure.getCurrentPrimaryBytes();
if (primaryBytes > 0) {
throw new AssertionError("pending primary bytes [" + primaryBytes + "] bytes on node ["
+ nodeAndClient.name + "].");
}
final long replicaWriteBytes = indexingPressure.getCurrentReplicaBytes();
if (replicaWriteBytes > 0) {
throw new AssertionError("pending replica write bytes [" + writeBytes + "] bytes on node ["
throw new AssertionError("pending replica write bytes [" + combinedBytes + "] bytes on node ["
+ nodeAndClient.name + "].");
}
}

View File

@ -141,7 +141,7 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
assertBusy(() -> {
// The actual write bytes will be greater due to other request fields. However, this test is
// just spot checking that the bytes are incremented at all.
assertTrue(memoryLimits.getCurrentCoordinatingAndPrimaryBytes() > finalSourceSize);
assertTrue(memoryLimits.getCurrentCombinedCoordinatingAndPrimaryBytes() > finalSourceSize);
});
blocker.countDown();
assertBusy(() -> {

View File

@ -68,7 +68,7 @@ public class TransportBulkShardOperationsAction
@Override
protected void doExecute(Task task, BulkShardOperationsRequest request, ActionListener<BulkShardOperationsResponse> listener) {
// This is executed on the follower coordinator node and we need to mark the bytes.
Releasable releasable = indexingPressure.markIndexingOperationStarted(primaryOperationSize(request));
Releasable releasable = indexingPressure.markCoordinatingOperationStarted(primaryOperationSize(request));
ActionListener<BulkShardOperationsResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
try {
super.doExecute(task, request, releasingListener);