From dc9e364ff2dc0f7d52e3baff09abe99d98a92830 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 2 Jul 2020 19:48:19 -0600 Subject: [PATCH] Count coordinating and primary bytes as write bytes (#58984) This is a follow-up to #57573. This commit combines coordinating and primary bytes under the same "write" bucket. Double accounting is prevented by only accounting the bytes at either the reroute phase or the primary phase. TransportBulkAction calls execute directly, so the operations handler is skipped and the bytes are not double accounted. --- .../action/bulk/WriteMemoryLimitsIT.java | 70 ++++++++++++------- .../action/bulk/TransportBulkAction.java | 2 +- .../action/bulk/WriteMemoryLimits.java | 34 ++++----- .../TransportResyncReplicationAction.java | 5 ++ .../TransportReplicationAction.java | 27 +++++-- .../replication/TransportWriteAction.java | 14 ++-- .../test/InternalTestCluster.java | 17 ++--- .../xpack/ccr/LocalIndexFollowingIT.java | 64 +++++++++++++++++ .../TransportBulkShardOperationsAction.java | 17 +++++ 9 files changed, 180 insertions(+), 70 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java index ebfd4b66fbb..6dab3a5cb04 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; @@ -42,9 +43,11 @@ import java.util.stream.Stream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.greaterThan; -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2) +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2, numClientNodes = 1, transportClientRatio = 0.0D) public class WriteMemoryLimitsIT extends ESIntegTestCase { + public static final String INDEX_NAME = "test"; + @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() @@ -69,15 +72,13 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { return 1; } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/58983") public void testWriteBytesAreIncremented() throws Exception { - final String index = "test"; - assertAcked(prepareCreate(index, Settings.builder() + assertAcked(prepareCreate(INDEX_NAME, Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))); - ensureGreen(index); + ensureGreen(INDEX_NAME); - IndicesStatsResponse response = client().admin().indices().prepareStats(index).get(); + IndicesStatsResponse response = client().admin().indices().prepareStats(INDEX_NAME).get(); String primaryId = Stream.of(response.getShards()) .map(ShardStats::getShardRouting) .filter(ShardRouting::primary) @@ -90,8 +91,10 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { .findAny() .get() .currentNodeId(); - String primaryName = client().admin().cluster().prepareState().get().getState().nodes().get(primaryId).getName(); - String replicaName = client().admin().cluster().prepareState().get().getState().nodes().get(replicaId).getName(); + DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes(); + String primaryName = nodes.get(primaryId).getName(); + String replicaName = nodes.get(replicaId).getName(); + String coordinatingOnlyNode = nodes.getCoordinatingOnlyNodes().iterator().next().value.getName(); final CountDownLatch replicationSendPointReached = new CountDownLatch(1); final CountDownLatch latchBlockingReplicationSend = new CountDownLatch(1); @@ -118,7 +121,7 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { final BulkRequest bulkRequest = new BulkRequest(); int totalRequestSize = 0; for (int i = 0; i < 80; ++i) { - IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID()) + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) .source(Collections.singletonMap("key", randomAlphaOfLength(50))); totalRequestSize += request.ramBytesUsed(); assertTrue(request.ramBytesUsed() > request.source().length()); @@ -129,18 +132,19 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { final long bulkShardRequestSize = totalRequestSize; try { - final ActionFuture successFuture = client(replicaName).bulk(bulkRequest); + final ActionFuture successFuture = client(coordinatingOnlyNode).bulk(bulkRequest); replicationSendPointReached.await(); WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName); WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName); + WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode); - assertThat(primaryWriteLimits.getCoordinatingBytes(), greaterThan(bulkShardRequestSize)); - assertThat(primaryWriteLimits.getPrimaryBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, primaryWriteLimits.getReplicaBytes()); - assertEquals(bulkRequestSize, replicaWriteLimits.getCoordinatingBytes()); - assertEquals(0, replicaWriteLimits.getPrimaryBytes()); - assertEquals(0, replicaWriteLimits.getReplicaBytes()); + assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); + assertEquals(0, replicaWriteLimits.getWriteBytes()); + assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); + assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); + assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); ThreadPool replicaThreadPool = replicaTransportService.getThreadPool(); // Block the replica Write thread pool @@ -163,18 +167,32 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { newActionsSendPointReached.await(); latchBlockingReplicationSend.countDown(); - IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID()) + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) .source(Collections.singletonMap("key", randomAlphaOfLength(50))); final BulkRequest secondBulkRequest = new BulkRequest(); secondBulkRequest.add(request); - ActionFuture secondFuture = client(replicaName).bulk(secondBulkRequest); + // Use the primary or the replica data node as the coordinating node this time + boolean usePrimaryAsCoordinatingNode = randomBoolean(); + final ActionFuture secondFuture; + if (usePrimaryAsCoordinatingNode) { + secondFuture = client(primaryName).bulk(secondBulkRequest); + } else { + secondFuture = client(replicaName).bulk(secondBulkRequest); + } final long secondBulkRequestSize = secondBulkRequest.ramBytesUsed(); final long secondBulkShardRequestSize = request.ramBytesUsed(); - assertBusy(() -> assertEquals(bulkRequestSize + secondBulkRequestSize, replicaWriteLimits.getCoordinatingBytes())); - assertBusy(() -> assertThat(replicaWriteLimits.getReplicaBytes(), + if (usePrimaryAsCoordinatingNode) { + assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkRequestSize)); + assertEquals(0, replicaWriteLimits.getWriteBytes()); + } else { + assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(secondBulkRequestSize, replicaWriteLimits.getWriteBytes()); + } + assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); + assertBusy(() -> assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkShardRequestSize))); latchBlockingReplication.countDown(); @@ -182,12 +200,12 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { successFuture.actionGet(); secondFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getCoordinatingBytes()); - assertEquals(0, primaryWriteLimits.getPrimaryBytes()); - assertEquals(0, primaryWriteLimits.getReplicaBytes()); - assertEquals(0, replicaWriteLimits.getCoordinatingBytes()); - assertEquals(0, replicaWriteLimits.getPrimaryBytes()); - assertEquals(0, replicaWriteLimits.getReplicaBytes()); + assertEquals(0, primaryWriteLimits.getWriteBytes()); + assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); + assertEquals(0, replicaWriteLimits.getWriteBytes()); + assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); + assertEquals(0, coordinatingWriteLimits.getWriteBytes()); + assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); } finally { if (replicationSendPointReached.getCount() > 0) { replicationSendPointReached.countDown(); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 90b1a2628e1..425c8e36d83 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -166,7 +166,7 @@ public class TransportBulkAction extends HandledTransportAction listener) { long indexingBytes = bulkRequest.ramBytesUsed(); - final Releasable releasable = writeMemoryLimits.markCoordinatingOperationStarted(indexingBytes); + final Releasable releasable = writeMemoryLimits.markWriteOperationStarted(indexingBytes); final ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); try { doInternalExecute(task, bulkRequest, releasingListener); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java b/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java index 84c702f1106..29371f07950 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java @@ -25,34 +25,24 @@ import java.util.concurrent.atomic.AtomicLong; public class WriteMemoryLimits { - private final AtomicLong coordinatingBytes = new AtomicLong(0); - private final AtomicLong primaryBytes = new AtomicLong(0); - private final AtomicLong replicaBytes = new AtomicLong(0); + private final AtomicLong writeBytes = new AtomicLong(0); + private final AtomicLong replicaWriteBytes = new AtomicLong(0); - public Releasable markCoordinatingOperationStarted(long bytes) { - coordinatingBytes.addAndGet(bytes); - return () -> coordinatingBytes.getAndAdd(-bytes); + public Releasable markWriteOperationStarted(long bytes) { + writeBytes.addAndGet(bytes); + return () -> writeBytes.getAndAdd(-bytes); } - public long getCoordinatingBytes() { - return coordinatingBytes.get(); + public long getWriteBytes() { + return writeBytes.get(); } - public Releasable markPrimaryOperationStarted(long bytes) { - primaryBytes.addAndGet(bytes); - return () -> primaryBytes.getAndAdd(-bytes); + public Releasable markReplicaWriteStarted(long bytes) { + replicaWriteBytes.getAndAdd(bytes); + return () -> replicaWriteBytes.getAndAdd(-bytes); } - public long getPrimaryBytes() { - return primaryBytes.get(); - } - - public Releasable markReplicaOperationStarted(long bytes) { - replicaBytes.getAndAdd(bytes); - return () -> replicaBytes.getAndAdd(-bytes); - } - - public long getReplicaBytes() { - return replicaBytes.get(); + public long getReplicaWriteBytes() { + return replicaWriteBytes.get(); } } diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 638371f4141..74ddcf54b32 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -64,6 +64,11 @@ public class TransportResyncReplicationAction extends TransportWriteAction listener) { + assert false : "use TransportResyncReplicationAction#sync"; + } + @Override protected ResyncReplicationResponse newResponseInstance(StreamInput in) throws IOException { return new ResyncReplicationResponse(in); diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 2a4258c3817..ed956c7d472 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -286,7 +286,7 @@ public abstract class TransportReplicationAction< } protected void handlePrimaryRequest(final ConcreteShardRequest request, final TransportChannel channel, final Task task) { - Releasable releasable = checkPrimaryLimits(request.getRequest()); + Releasable releasable = checkPrimaryLimits(request.getRequest(), request.sentFromLocalReroute()); ActionListener listener = ActionListener.runBefore(new ChannelActionListener<>(channel, transportPrimaryAction, request), releasable::close); @@ -297,7 +297,7 @@ public abstract class TransportReplicationAction< } } - protected Releasable checkPrimaryLimits(final Request request) { + protected Releasable checkPrimaryLimits(final Request request, boolean rerouteWasLocal) { return () -> {}; } @@ -372,8 +372,7 @@ public abstract class TransportReplicationAction< DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId()); transportService.sendRequest(relocatingNode, transportPrimaryAction, new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(), - primaryRequest.getPrimaryTerm()), - transportOptions, + primaryRequest.getPrimaryTerm()), transportOptions, new ActionListenerResponseHandler(onCompletionListener, reader) { @Override public void handleResponse(Response response) { @@ -585,7 +584,7 @@ public abstract class TransportReplicationAction< Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller AsyncReplicaAction.this.onFailure(e); })); - // TODO: Evaludate if we still need to catch this exception + // TODO: Evaluate if we still need to catch this exception } catch (Exception e) { Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller AsyncReplicaAction.this.onFailure(e); @@ -751,7 +750,7 @@ public abstract class TransportReplicationAction< transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId()); } performAction(node, transportPrimaryAction, true, - new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id()))); + new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id()), true)); } private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) { @@ -1103,19 +1102,27 @@ public abstract class TransportReplicationAction< private final String targetAllocationID; private final long primaryTerm; private final R request; + // Indicates if this primary shard request originated by a reroute on this local node. + private final boolean sentFromLocalReroute; public ConcreteShardRequest(Writeable.Reader requestReader, StreamInput in) throws IOException { targetAllocationID = in.readString(); primaryTerm = in.readVLong(); + sentFromLocalReroute = false; request = requestReader.read(in); } public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm) { + this(request, targetAllocationID, primaryTerm, false); + } + + public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm, boolean sentFromLocalReroute) { Objects.requireNonNull(request); Objects.requireNonNull(targetAllocationID); this.request = request; this.targetAllocationID = targetAllocationID; this.primaryTerm = primaryTerm; + this.sentFromLocalReroute = sentFromLocalReroute; } @Override @@ -1144,11 +1151,19 @@ public abstract class TransportReplicationAction< @Override public void writeTo(StreamOutput out) throws IOException { + // If sentFromLocalReroute is marked true, then this request should just be looped back through + // the local transport. It should never be serialized to be sent over the wire. If it is sent over + // the wire, then it was NOT sent from a local reroute. + assert sentFromLocalReroute == false; out.writeString(targetAllocationID); out.writeVLong(primaryTerm); request.writeTo(out); } + public boolean sentFromLocalReroute() { + return sentFromLocalReroute; + } + public R getRequest() { return request; } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 841f60ebb29..5211a446620 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -80,12 +80,18 @@ public abstract class TransportWriteAction< @Override protected Releasable checkOperationLimits(Request request) { - return writeMemoryLimits.markCoordinatingOperationStarted(primaryOperationSize(request)); + return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); } @Override - protected Releasable checkPrimaryLimits(Request request) { - return writeMemoryLimits.markPrimaryOperationStarted(primaryOperationSize(request)); + protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal) { + // If this primary request was submitted by a reroute performed on this local node, we have already + // accounted the bytes. + if (rerouteWasLocal) { + return () -> {}; + } else { + return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); + } } protected long primaryOperationSize(Request request) { @@ -94,7 +100,7 @@ public abstract class TransportWriteAction< @Override protected Releasable checkReplicaLimits(ReplicaRequest request) { - return writeMemoryLimits.markReplicaOperationStarted(replicaOperationSize(request)); + return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request)); } protected long replicaOperationSize(ReplicaRequest request) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 167e9c8457e..3b4e8dd660a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1349,19 +1349,14 @@ public final class InternalTestCluster extends TestCluster { assertBusy(() -> { for (NodeAndClient nodeAndClient : nodes.values()) { WriteMemoryLimits writeMemoryLimits = getInstance(WriteMemoryLimits.class, nodeAndClient.name); - final long coordinatingBytes = writeMemoryLimits.getCoordinatingBytes(); - if (coordinatingBytes > 0) { - throw new AssertionError("pending coordinating write bytes [" + coordinatingBytes + "] bytes on node [" + final long writeBytes = writeMemoryLimits.getWriteBytes(); + if (writeBytes > 0) { + throw new AssertionError("pending write bytes [" + writeBytes + "] bytes on node [" + nodeAndClient.name + "]."); } - final long primaryBytes = writeMemoryLimits.getPrimaryBytes(); - if (primaryBytes > 0) { - throw new AssertionError("pending primary write bytes [" + coordinatingBytes + "] bytes on node [" - + nodeAndClient.name + "]."); - } - final long replicaBytes = writeMemoryLimits.getReplicaBytes(); - if (replicaBytes > 0) { - throw new AssertionError("pending replica write bytes [" + coordinatingBytes + "] bytes on node [" + final long replicaWriteBytes = writeMemoryLimits.getReplicaWriteBytes(); + if (replicaWriteBytes > 0) { + throw new AssertionError("pending replica write bytes [" + writeBytes + "] bytes on node [" + nodeAndClient.name + "]."); } } diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index ecb875db43c..ca5a2b268b7 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -6,13 +6,16 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.CcrSingleNodeTestCase; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; @@ -24,6 +27,8 @@ import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import java.io.IOException; import java.util.Collections; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.stream.StreamSupport; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -92,6 +97,65 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase { assertThat(client().admin().indices().prepareExists("follower-index").get().isExists(), equalTo(false)); } + public void testWriteLimitsIncremented() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader"); + + // Use a sufficiently small number of docs to ensure that they are well below the number of docs that + // can be sent in a single TransportBulkShardOperationsAction + final long firstBatchNumDocs = randomIntBetween(10, 20); + long sourceSize = 0; + for (int i = 0; i < firstBatchNumDocs; i++) { + BytesArray source = new BytesArray("{}"); + sourceSize += source.length(); + client().prepareIndex("leader", "doc").setSource(source, XContentType.JSON).get(); + } + + ThreadPool nodeThreadPool = getInstanceFromNode(ThreadPool.class); + ThreadPool.Info writeInfo = StreamSupport.stream(nodeThreadPool.info().spliterator(), false) + .filter(i -> i.getName().equals(ThreadPool.Names.WRITE)).findAny().get(); + int numberOfThreads = writeInfo.getMax(); + CountDownLatch threadBlockedLatch = new CountDownLatch(numberOfThreads); + CountDownLatch blocker = new CountDownLatch(1); + + for (int i = 0; i < numberOfThreads; ++i) { + nodeThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> { + try { + threadBlockedLatch.countDown(); + blocker.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + }); + } + threadBlockedLatch.await(); + + try { + final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + + WriteMemoryLimits memoryLimits = getInstanceFromNode(WriteMemoryLimits.class); + final long finalSourceSize = sourceSize; + assertBusy(() -> { + // The actual write bytes will be greater due to other request fields. However, this test is + // just spot checking that the bytes are incremented at all. + assertTrue(memoryLimits.getWriteBytes() > finalSourceSize); + }); + blocker.countDown(); + assertBusy(() -> { + assertThat(client().prepareSearch("follower").get().getHits().getTotalHits().value, equalTo(firstBatchNumDocs)); + }); + ensureEmptyWriteBuffers(); + } finally { + if (blocker.getCount() > 0) { + blocker.countDown(); + } + } + + } + public void testRemoveRemoteConnection() throws Exception { PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); request.setName("my_pattern"); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index f860a28eea2..c0a2db2d3b4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SeqNoStats; @@ -25,6 +26,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.index.engine.AlreadyProcessedFollowingEngineException; @@ -36,6 +38,8 @@ import java.util.List; public class TransportBulkShardOperationsAction extends TransportWriteAction { + private final WriteMemoryLimits writeMemoryLimits; + @Inject public TransportBulkShardOperationsAction( final Settings settings, @@ -58,6 +62,19 @@ public class TransportBulkShardOperationsAction BulkShardOperationsRequest::new, BulkShardOperationsRequest::new, ThreadPool.Names.WRITE, false, writeMemoryLimits); + this.writeMemoryLimits = writeMemoryLimits; + } + + @Override + protected void doExecute(Task task, BulkShardOperationsRequest request, ActionListener listener) { + // This is executed on the follower coordinator node and we need to mark the bytes. + Releasable releasable = writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); + ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); + try { + super.doExecute(task, request, releasingListener); + } catch (Exception e) { + releasingListener.onFailure(e); + } } @Override