diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java index ebfd4b66fbb..6dab3a5cb04 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; @@ -42,9 +43,11 @@ import java.util.stream.Stream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.greaterThan; -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2) +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2, numClientNodes = 1, transportClientRatio = 0.0D) public class WriteMemoryLimitsIT extends ESIntegTestCase { + public static final String INDEX_NAME = "test"; + @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() @@ -69,15 +72,13 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { return 1; } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/58983") public void testWriteBytesAreIncremented() throws Exception { - final String index = "test"; - assertAcked(prepareCreate(index, Settings.builder() + assertAcked(prepareCreate(INDEX_NAME, Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))); - ensureGreen(index); + ensureGreen(INDEX_NAME); - IndicesStatsResponse response = client().admin().indices().prepareStats(index).get(); + IndicesStatsResponse response = client().admin().indices().prepareStats(INDEX_NAME).get(); String primaryId = Stream.of(response.getShards()) .map(ShardStats::getShardRouting) .filter(ShardRouting::primary) @@ -90,8 +91,10 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { .findAny() .get() .currentNodeId(); - String primaryName = client().admin().cluster().prepareState().get().getState().nodes().get(primaryId).getName(); - String replicaName = client().admin().cluster().prepareState().get().getState().nodes().get(replicaId).getName(); + DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes(); + String primaryName = nodes.get(primaryId).getName(); + String replicaName = nodes.get(replicaId).getName(); + String coordinatingOnlyNode = nodes.getCoordinatingOnlyNodes().iterator().next().value.getName(); final CountDownLatch replicationSendPointReached = new CountDownLatch(1); final CountDownLatch latchBlockingReplicationSend = new CountDownLatch(1); @@ -118,7 +121,7 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { final BulkRequest bulkRequest = new BulkRequest(); int totalRequestSize = 0; for (int i = 0; i < 80; ++i) { - IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID()) + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) .source(Collections.singletonMap("key", randomAlphaOfLength(50))); totalRequestSize += request.ramBytesUsed(); assertTrue(request.ramBytesUsed() > request.source().length()); @@ -129,18 +132,19 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { final long bulkShardRequestSize = totalRequestSize; try { - final ActionFuture successFuture = client(replicaName).bulk(bulkRequest); + final ActionFuture successFuture = client(coordinatingOnlyNode).bulk(bulkRequest); replicationSendPointReached.await(); WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName); WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName); + WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode); - assertThat(primaryWriteLimits.getCoordinatingBytes(), greaterThan(bulkShardRequestSize)); - assertThat(primaryWriteLimits.getPrimaryBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, primaryWriteLimits.getReplicaBytes()); - assertEquals(bulkRequestSize, replicaWriteLimits.getCoordinatingBytes()); - assertEquals(0, replicaWriteLimits.getPrimaryBytes()); - assertEquals(0, replicaWriteLimits.getReplicaBytes()); + assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); + assertEquals(0, replicaWriteLimits.getWriteBytes()); + assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); + assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); + assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); ThreadPool replicaThreadPool = replicaTransportService.getThreadPool(); // Block the replica Write thread pool @@ -163,18 +167,32 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { newActionsSendPointReached.await(); latchBlockingReplicationSend.countDown(); - IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID()) + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) .source(Collections.singletonMap("key", randomAlphaOfLength(50))); final BulkRequest secondBulkRequest = new BulkRequest(); secondBulkRequest.add(request); - ActionFuture secondFuture = client(replicaName).bulk(secondBulkRequest); + // Use the primary or the replica data node as the coordinating node this time + boolean usePrimaryAsCoordinatingNode = randomBoolean(); + final ActionFuture secondFuture; + if (usePrimaryAsCoordinatingNode) { + secondFuture = client(primaryName).bulk(secondBulkRequest); + } else { + secondFuture = client(replicaName).bulk(secondBulkRequest); + } final long secondBulkRequestSize = secondBulkRequest.ramBytesUsed(); final long secondBulkShardRequestSize = request.ramBytesUsed(); - assertBusy(() -> assertEquals(bulkRequestSize + secondBulkRequestSize, replicaWriteLimits.getCoordinatingBytes())); - assertBusy(() -> assertThat(replicaWriteLimits.getReplicaBytes(), + if (usePrimaryAsCoordinatingNode) { + assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkRequestSize)); + assertEquals(0, replicaWriteLimits.getWriteBytes()); + } else { + assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(secondBulkRequestSize, replicaWriteLimits.getWriteBytes()); + } + assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); + assertBusy(() -> assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkShardRequestSize))); latchBlockingReplication.countDown(); @@ -182,12 +200,12 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { successFuture.actionGet(); secondFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getCoordinatingBytes()); - assertEquals(0, primaryWriteLimits.getPrimaryBytes()); - assertEquals(0, primaryWriteLimits.getReplicaBytes()); - assertEquals(0, replicaWriteLimits.getCoordinatingBytes()); - assertEquals(0, replicaWriteLimits.getPrimaryBytes()); - assertEquals(0, replicaWriteLimits.getReplicaBytes()); + assertEquals(0, primaryWriteLimits.getWriteBytes()); + assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); + assertEquals(0, replicaWriteLimits.getWriteBytes()); + assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); + assertEquals(0, coordinatingWriteLimits.getWriteBytes()); + assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); } finally { if (replicationSendPointReached.getCount() > 0) { replicationSendPointReached.countDown(); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 90b1a2628e1..425c8e36d83 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -166,7 +166,7 @@ public class TransportBulkAction extends HandledTransportAction listener) { long indexingBytes = bulkRequest.ramBytesUsed(); - final Releasable releasable = writeMemoryLimits.markCoordinatingOperationStarted(indexingBytes); + final Releasable releasable = writeMemoryLimits.markWriteOperationStarted(indexingBytes); final ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); try { doInternalExecute(task, bulkRequest, releasingListener); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java b/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java index 84c702f1106..29371f07950 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java @@ -25,34 +25,24 @@ import java.util.concurrent.atomic.AtomicLong; public class WriteMemoryLimits { - private final AtomicLong coordinatingBytes = new AtomicLong(0); - private final AtomicLong primaryBytes = new AtomicLong(0); - private final AtomicLong replicaBytes = new AtomicLong(0); + private final AtomicLong writeBytes = new AtomicLong(0); + private final AtomicLong replicaWriteBytes = new AtomicLong(0); - public Releasable markCoordinatingOperationStarted(long bytes) { - coordinatingBytes.addAndGet(bytes); - return () -> coordinatingBytes.getAndAdd(-bytes); + public Releasable markWriteOperationStarted(long bytes) { + writeBytes.addAndGet(bytes); + return () -> writeBytes.getAndAdd(-bytes); } - public long getCoordinatingBytes() { - return coordinatingBytes.get(); + public long getWriteBytes() { + return writeBytes.get(); } - public Releasable markPrimaryOperationStarted(long bytes) { - primaryBytes.addAndGet(bytes); - return () -> primaryBytes.getAndAdd(-bytes); + public Releasable markReplicaWriteStarted(long bytes) { + replicaWriteBytes.getAndAdd(bytes); + return () -> replicaWriteBytes.getAndAdd(-bytes); } - public long getPrimaryBytes() { - return primaryBytes.get(); - } - - public Releasable markReplicaOperationStarted(long bytes) { - replicaBytes.getAndAdd(bytes); - return () -> replicaBytes.getAndAdd(-bytes); - } - - public long getReplicaBytes() { - return replicaBytes.get(); + public long getReplicaWriteBytes() { + return replicaWriteBytes.get(); } } diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 638371f4141..74ddcf54b32 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -64,6 +64,11 @@ public class TransportResyncReplicationAction extends TransportWriteAction listener) { + assert false : "use TransportResyncReplicationAction#sync"; + } + @Override protected ResyncReplicationResponse newResponseInstance(StreamInput in) throws IOException { return new ResyncReplicationResponse(in); diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 2a4258c3817..ed956c7d472 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -286,7 +286,7 @@ public abstract class TransportReplicationAction< } protected void handlePrimaryRequest(final ConcreteShardRequest request, final TransportChannel channel, final Task task) { - Releasable releasable = checkPrimaryLimits(request.getRequest()); + Releasable releasable = checkPrimaryLimits(request.getRequest(), request.sentFromLocalReroute()); ActionListener listener = ActionListener.runBefore(new ChannelActionListener<>(channel, transportPrimaryAction, request), releasable::close); @@ -297,7 +297,7 @@ public abstract class TransportReplicationAction< } } - protected Releasable checkPrimaryLimits(final Request request) { + protected Releasable checkPrimaryLimits(final Request request, boolean rerouteWasLocal) { return () -> {}; } @@ -372,8 +372,7 @@ public abstract class TransportReplicationAction< DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId()); transportService.sendRequest(relocatingNode, transportPrimaryAction, new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(), - primaryRequest.getPrimaryTerm()), - transportOptions, + primaryRequest.getPrimaryTerm()), transportOptions, new ActionListenerResponseHandler(onCompletionListener, reader) { @Override public void handleResponse(Response response) { @@ -585,7 +584,7 @@ public abstract class TransportReplicationAction< Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller AsyncReplicaAction.this.onFailure(e); })); - // TODO: Evaludate if we still need to catch this exception + // TODO: Evaluate if we still need to catch this exception } catch (Exception e) { Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller AsyncReplicaAction.this.onFailure(e); @@ -751,7 +750,7 @@ public abstract class TransportReplicationAction< transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId()); } performAction(node, transportPrimaryAction, true, - new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id()))); + new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id()), true)); } private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) { @@ -1103,19 +1102,27 @@ public abstract class TransportReplicationAction< private final String targetAllocationID; private final long primaryTerm; private final R request; + // Indicates if this primary shard request originated by a reroute on this local node. + private final boolean sentFromLocalReroute; public ConcreteShardRequest(Writeable.Reader requestReader, StreamInput in) throws IOException { targetAllocationID = in.readString(); primaryTerm = in.readVLong(); + sentFromLocalReroute = false; request = requestReader.read(in); } public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm) { + this(request, targetAllocationID, primaryTerm, false); + } + + public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm, boolean sentFromLocalReroute) { Objects.requireNonNull(request); Objects.requireNonNull(targetAllocationID); this.request = request; this.targetAllocationID = targetAllocationID; this.primaryTerm = primaryTerm; + this.sentFromLocalReroute = sentFromLocalReroute; } @Override @@ -1144,11 +1151,19 @@ public abstract class TransportReplicationAction< @Override public void writeTo(StreamOutput out) throws IOException { + // If sentFromLocalReroute is marked true, then this request should just be looped back through + // the local transport. It should never be serialized to be sent over the wire. If it is sent over + // the wire, then it was NOT sent from a local reroute. + assert sentFromLocalReroute == false; out.writeString(targetAllocationID); out.writeVLong(primaryTerm); request.writeTo(out); } + public boolean sentFromLocalReroute() { + return sentFromLocalReroute; + } + public R getRequest() { return request; } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 841f60ebb29..5211a446620 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -80,12 +80,18 @@ public abstract class TransportWriteAction< @Override protected Releasable checkOperationLimits(Request request) { - return writeMemoryLimits.markCoordinatingOperationStarted(primaryOperationSize(request)); + return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); } @Override - protected Releasable checkPrimaryLimits(Request request) { - return writeMemoryLimits.markPrimaryOperationStarted(primaryOperationSize(request)); + protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal) { + // If this primary request was submitted by a reroute performed on this local node, we have already + // accounted the bytes. + if (rerouteWasLocal) { + return () -> {}; + } else { + return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); + } } protected long primaryOperationSize(Request request) { @@ -94,7 +100,7 @@ public abstract class TransportWriteAction< @Override protected Releasable checkReplicaLimits(ReplicaRequest request) { - return writeMemoryLimits.markReplicaOperationStarted(replicaOperationSize(request)); + return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request)); } protected long replicaOperationSize(ReplicaRequest request) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 167e9c8457e..3b4e8dd660a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1349,19 +1349,14 @@ public final class InternalTestCluster extends TestCluster { assertBusy(() -> { for (NodeAndClient nodeAndClient : nodes.values()) { WriteMemoryLimits writeMemoryLimits = getInstance(WriteMemoryLimits.class, nodeAndClient.name); - final long coordinatingBytes = writeMemoryLimits.getCoordinatingBytes(); - if (coordinatingBytes > 0) { - throw new AssertionError("pending coordinating write bytes [" + coordinatingBytes + "] bytes on node [" + final long writeBytes = writeMemoryLimits.getWriteBytes(); + if (writeBytes > 0) { + throw new AssertionError("pending write bytes [" + writeBytes + "] bytes on node [" + nodeAndClient.name + "]."); } - final long primaryBytes = writeMemoryLimits.getPrimaryBytes(); - if (primaryBytes > 0) { - throw new AssertionError("pending primary write bytes [" + coordinatingBytes + "] bytes on node [" - + nodeAndClient.name + "]."); - } - final long replicaBytes = writeMemoryLimits.getReplicaBytes(); - if (replicaBytes > 0) { - throw new AssertionError("pending replica write bytes [" + coordinatingBytes + "] bytes on node [" + final long replicaWriteBytes = writeMemoryLimits.getReplicaWriteBytes(); + if (replicaWriteBytes > 0) { + throw new AssertionError("pending replica write bytes [" + writeBytes + "] bytes on node [" + nodeAndClient.name + "]."); } } diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index ecb875db43c..ca5a2b268b7 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -6,13 +6,16 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.CcrSingleNodeTestCase; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; @@ -24,6 +27,8 @@ import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import java.io.IOException; import java.util.Collections; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.stream.StreamSupport; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -92,6 +97,65 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase { assertThat(client().admin().indices().prepareExists("follower-index").get().isExists(), equalTo(false)); } + public void testWriteLimitsIncremented() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader"); + + // Use a sufficiently small number of docs to ensure that they are well below the number of docs that + // can be sent in a single TransportBulkShardOperationsAction + final long firstBatchNumDocs = randomIntBetween(10, 20); + long sourceSize = 0; + for (int i = 0; i < firstBatchNumDocs; i++) { + BytesArray source = new BytesArray("{}"); + sourceSize += source.length(); + client().prepareIndex("leader", "doc").setSource(source, XContentType.JSON).get(); + } + + ThreadPool nodeThreadPool = getInstanceFromNode(ThreadPool.class); + ThreadPool.Info writeInfo = StreamSupport.stream(nodeThreadPool.info().spliterator(), false) + .filter(i -> i.getName().equals(ThreadPool.Names.WRITE)).findAny().get(); + int numberOfThreads = writeInfo.getMax(); + CountDownLatch threadBlockedLatch = new CountDownLatch(numberOfThreads); + CountDownLatch blocker = new CountDownLatch(1); + + for (int i = 0; i < numberOfThreads; ++i) { + nodeThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> { + try { + threadBlockedLatch.countDown(); + blocker.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + }); + } + threadBlockedLatch.await(); + + try { + final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + + WriteMemoryLimits memoryLimits = getInstanceFromNode(WriteMemoryLimits.class); + final long finalSourceSize = sourceSize; + assertBusy(() -> { + // The actual write bytes will be greater due to other request fields. However, this test is + // just spot checking that the bytes are incremented at all. + assertTrue(memoryLimits.getWriteBytes() > finalSourceSize); + }); + blocker.countDown(); + assertBusy(() -> { + assertThat(client().prepareSearch("follower").get().getHits().getTotalHits().value, equalTo(firstBatchNumDocs)); + }); + ensureEmptyWriteBuffers(); + } finally { + if (blocker.getCount() > 0) { + blocker.countDown(); + } + } + + } + public void testRemoveRemoteConnection() throws Exception { PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); request.setName("my_pattern"); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index f860a28eea2..c0a2db2d3b4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SeqNoStats; @@ -25,6 +26,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.index.engine.AlreadyProcessedFollowingEngineException; @@ -36,6 +38,8 @@ import java.util.List; public class TransportBulkShardOperationsAction extends TransportWriteAction { + private final WriteMemoryLimits writeMemoryLimits; + @Inject public TransportBulkShardOperationsAction( final Settings settings, @@ -58,6 +62,19 @@ public class TransportBulkShardOperationsAction BulkShardOperationsRequest::new, BulkShardOperationsRequest::new, ThreadPool.Names.WRITE, false, writeMemoryLimits); + this.writeMemoryLimits = writeMemoryLimits; + } + + @Override + protected void doExecute(Task task, BulkShardOperationsRequest request, ActionListener listener) { + // This is executed on the follower coordinator node and we need to mark the bytes. + Releasable releasable = writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); + ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); + try { + super.doExecute(task, request, releasingListener); + } catch (Exception e) { + releasingListener.onFailure(e); + } } @Override