diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java new file mode 100644 index 00000000000..276d3013ed9 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java @@ -0,0 +1,206 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Stream; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.greaterThan; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2) +public class WriteMemoryLimitsIT extends ESIntegTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + // Need at least two threads because we are going to block one + .put("thread_pool.write.size", 2) + .build(); + } + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class); + } + + @Override + protected int numberOfReplicas() { + return 1; + } + + @Override + protected int numberOfShards() { + return 1; + } + + public void testWriteBytesAreIncremented() throws Exception { + final String index = "test"; + assertAcked(prepareCreate(index, Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))); + ensureGreen(index); + + IndicesStatsResponse response = client().admin().indices().prepareStats(index).get(); + String primaryId = Stream.of(response.getShards()) + .map(ShardStats::getShardRouting) + .filter(ShardRouting::primary) + .findAny() + .get() + .currentNodeId(); + String replicaId = Stream.of(response.getShards()) + .map(ShardStats::getShardRouting) + .filter(sr -> sr.primary() == false) + .findAny() + .get() + .currentNodeId(); + String primaryName = client().admin().cluster().prepareState().get().getState().nodes().get(primaryId).getName(); + String replicaName = client().admin().cluster().prepareState().get().getState().nodes().get(replicaId).getName(); + + final CountDownLatch replicationSendPointReached = new CountDownLatch(1); + final CountDownLatch latchBlockingReplicationSend = new CountDownLatch(1); + final CountDownLatch newActionsSendPointReached = new CountDownLatch(2); + final CountDownLatch latchBlockingReplication = new CountDownLatch(1); + + TransportService primaryService = internalCluster().getInstance(TransportService.class, primaryName); + final MockTransportService primaryTransportService = (MockTransportService) primaryService; + TransportService replicaService = internalCluster().getInstance(TransportService.class, replicaName); + final MockTransportService replicaTransportService = (MockTransportService) replicaService; + + primaryTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals(TransportShardBulkAction.ACTION_NAME + "[r]")) { + try { + replicationSendPointReached.countDown(); + latchBlockingReplicationSend.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + connection.sendRequest(requestId, action, request, options); + }); + + final BulkRequest bulkRequest = new BulkRequest(); + int totalRequestSize = 0; + for (int i = 0; i < 80; ++i) { + IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID()) + .source(Collections.singletonMap("key", randomAlphaOfLength(50))); + totalRequestSize += request.ramBytesUsed(); + assertTrue(request.ramBytesUsed() > request.source().length()); + bulkRequest.add(request); + } + + final long bulkRequestSize = bulkRequest.ramBytesUsed(); + final long bulkShardRequestSize = totalRequestSize; + + try { + final ActionFuture successFuture = client(replicaName).bulk(bulkRequest); + replicationSendPointReached.await(); + + WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName); + WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName); + + assertThat(primaryWriteLimits.getCoordinatingBytes(), greaterThan(bulkShardRequestSize)); + assertThat(primaryWriteLimits.getPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.getReplicaBytes()); + assertEquals(bulkRequestSize, replicaWriteLimits.getCoordinatingBytes()); + assertEquals(0, replicaWriteLimits.getPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getReplicaBytes()); + + ThreadPool replicaThreadPool = replicaTransportService.getThreadPool(); + // Block the replica Write thread pool + replicaThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> { + try { + newActionsSendPointReached.countDown(); + latchBlockingReplication.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + }); + replicaThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> { + try { + newActionsSendPointReached.countDown(); + latchBlockingReplication.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + }); + newActionsSendPointReached.await(); + latchBlockingReplicationSend.countDown(); + + IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID()) + .source(Collections.singletonMap("key", randomAlphaOfLength(50))); + final BulkRequest secondBulkRequest = new BulkRequest(); + secondBulkRequest.add(request); + + ActionFuture secondFuture = client(replicaName).bulk(secondBulkRequest); + + final long secondBulkRequestSize = secondBulkRequest.ramBytesUsed(); + final long secondBulkShardRequestSize = request.ramBytesUsed(); + + assertEquals(bulkRequestSize + secondBulkRequestSize, replicaWriteLimits.getCoordinatingBytes()); + assertBusy(() -> assertThat(replicaWriteLimits.getReplicaBytes(), + greaterThan(bulkShardRequestSize + secondBulkShardRequestSize))); + + latchBlockingReplication.countDown(); + + successFuture.actionGet(); + secondFuture.actionGet(); + + assertEquals(0, primaryWriteLimits.getCoordinatingBytes()); + assertEquals(0, primaryWriteLimits.getPrimaryBytes()); + assertEquals(0, primaryWriteLimits.getReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCoordinatingBytes()); + assertEquals(0, replicaWriteLimits.getPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getReplicaBytes()); + } finally { + if (replicationSendPointReached.getCount() > 0) { + replicationSendPointReached.countDown(); + } + while (newActionsSendPointReached.getCount() > 0) { + newActionsSendPointReached.countDown(); + } + if (latchBlockingReplicationSend.getCount() > 0) { + latchBlockingReplicationSend.countDown(); + } + if (latchBlockingReplication.getCount() > 0) { + latchBlockingReplication.countDown(); + } + primaryTransportService.clearAllRules(); + } + } +} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java index 497cb8122db..5019e9cb1c1 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java @@ -124,8 +124,8 @@ public class TransportReplicationActionRetryOnClosedNodeIT extends ESIntegTestCa } @Override - protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard replica) { - return new ReplicaResult(); + protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { + listener.onResponse(new ReplicaResult()); } } diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index eb0a1692b99..abe309fa766 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.action; +import org.apache.lucene.util.Accountable; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.IndicesOptions; @@ -40,7 +41,7 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; * Generic interface to group ActionRequest, which perform writes to a single document * Action requests implementing this can be part of {@link org.elasticsearch.action.bulk.BulkRequest} */ -public interface DocWriteRequest extends IndicesRequest { +public interface DocWriteRequest extends IndicesRequest, Accountable { /** * Set the index for this request diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index 3c36f362dd1..24f45e904e7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -94,9 +94,11 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA } @Override - protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws IOException { - executeShardOperation(shardRequest, replica); - return new ReplicaResult(); + protected void shardOperationOnReplica(ShardRequest shardRequest, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + executeShardOperation(shardRequest, replica); + return new ReplicaResult(); + }); } private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index bb6b520a671..6059dbbc6af 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -64,9 +64,11 @@ public class TransportShardFlushAction } @Override - protected ReplicaResult shardOperationOnReplica(ShardFlushRequest request, IndexShard replica) { - replica.flush(request.getRequest()); - logger.trace("{} flush request executed on replica", replica.shardId()); - return new ReplicaResult(); + protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + replica.flush(request.getRequest()); + logger.trace("{} flush request executed on replica", replica.shardId()); + return new ReplicaResult(); + }); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java index 79cc44f6995..73990cb3ab7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java @@ -100,9 +100,11 @@ public class TransportVerifyShardIndexBlockAction extends TransportReplicationAc } @Override - protected ReplicaResult shardOperationOnReplica(ShardRequest shardRequest, IndexShard replica) { - executeShardOperation(shardRequest, replica); - return new ReplicaResult(); + protected void shardOperationOnReplica(ShardRequest shardRequest, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + executeShardOperation(shardRequest, replica); + return new ReplicaResult(); + }); } private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index f90ac5cccd6..09c6712221c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -66,9 +66,12 @@ public class TransportShardRefreshAction } @Override - protected ReplicaResult shardOperationOnReplica(BasicReplicationRequest request, IndexShard replica) { - replica.refresh("api"); - logger.trace("{} refresh request executed on replica", replica.shardId()); - return new ReplicaResult(); + protected void shardOperationOnReplica(BasicReplicationRequest request, IndexShard replica, + ActionListener listener) { + ActionListener.completeWith(listener, () -> { + replica.refresh("api"); + logger.trace("{} refresh request executed on replica", replica.shardId()); + return new ReplicaResult(); + }); } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java index 2708bd9cc43..2c3357fcad7 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.action.bulk; +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -30,7 +32,9 @@ import org.elasticsearch.index.shard.ShardId; import java.io.IOException; import java.util.Objects; -public class BulkItemRequest implements Writeable { +public class BulkItemRequest implements Writeable, Accountable { + + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkItemRequest.class); private int id; private DocWriteRequest request; @@ -115,4 +119,9 @@ public class BulkItemRequest implements Writeable { DocWriteRequest.writeDocumentRequestThin(out, request); out.writeOptionalWriteable(primaryResponse == null ? null : primaryResponse::writeThin); } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + request.ramBytesUsed(); + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 775c4c8f8de..083d325aca8 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.action.bulk; +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; @@ -56,7 +58,9 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; * Note that we only support refresh on the bulk request not per item. * @see org.elasticsearch.client.Client#bulk(BulkRequest) */ -public class BulkRequest extends ActionRequest implements CompositeIndicesRequest, WriteRequest { +public class BulkRequest extends ActionRequest implements CompositeIndicesRequest, WriteRequest, Accountable { + + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkRequest.class); private static final int REQUEST_OVERHEAD = 50; @@ -429,4 +433,9 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques } return value; } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + requests.stream().mapToLong(Accountable::ramBytesUsed).sum(); + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java index f67a54e4dd9..5b11dd7e54b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.action.bulk; +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.Version; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationRequest; @@ -29,12 +31,14 @@ import org.elasticsearch.index.shard.ShardId; import java.io.IOException; import java.util.HashSet; import java.util.Set; +import java.util.stream.Stream; -public class BulkShardRequest extends ReplicatedWriteRequest { +public class BulkShardRequest extends ReplicatedWriteRequest implements Accountable { public static final Version COMPACT_SHARD_ID_VERSION = Version.V_7_9_0; + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkShardRequest.class); - private BulkItemRequest[] items; + private final BulkItemRequest[] items; public BulkShardRequest(StreamInput in) throws IOException { super(in); @@ -143,4 +147,9 @@ public class BulkShardRequest extends ReplicatedWriteRequest { } } } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + Stream.of(items).mapToLong(Accountable::ramBytesUsed).sum(); + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index e0fa03f2dce..90b1a2628e1 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -59,6 +59,7 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -112,23 +113,24 @@ public class TransportBulkAction extends HandledTransportAction docWriteRequest) { IndexRequest indexRequest = null; if (docWriteRequest instanceof IndexRequest) { indexRequest = (IndexRequest) docWriteRequest; @@ -162,6 +165,17 @@ public class TransportBulkAction extends HandledTransportAction listener) { + long indexingBytes = bulkRequest.ramBytesUsed(); + final Releasable releasable = writeMemoryLimits.markCoordinatingOperationStarted(indexingBytes); + final ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); + try { + doInternalExecute(task, bulkRequest, releasingListener); + } catch (Exception e) { + releasingListener.onFailure(e); + } + } + + protected void doInternalExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { final long startTime = relativeTime(); final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size()); @@ -749,7 +763,7 @@ public class TransportBulkAction extends HandledTransportAction> listener) { ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext()); performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, @@ -136,6 +137,11 @@ public class TransportShardBulkAction extends TransportWriteAction shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { - final Translog.Location location = performOnReplica(request, replica); - return new WriteReplicaResult<>(request, location, null, replica, logger); + protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + final Translog.Location location = performOnReplica(request, replica); + return new WriteReplicaResult<>(request, location, null, replica, logger); + }); + } + + @Override + protected long replicaOperationSize(BulkShardRequest request) { + return request.ramBytesUsed(); } public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java b/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java new file mode 100644 index 00000000000..84c702f1106 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.common.lease.Releasable; + +import java.util.concurrent.atomic.AtomicLong; + +public class WriteMemoryLimits { + + private final AtomicLong coordinatingBytes = new AtomicLong(0); + private final AtomicLong primaryBytes = new AtomicLong(0); + private final AtomicLong replicaBytes = new AtomicLong(0); + + public Releasable markCoordinatingOperationStarted(long bytes) { + coordinatingBytes.addAndGet(bytes); + return () -> coordinatingBytes.getAndAdd(-bytes); + } + + public long getCoordinatingBytes() { + return coordinatingBytes.get(); + } + + public Releasable markPrimaryOperationStarted(long bytes) { + primaryBytes.addAndGet(bytes); + return () -> primaryBytes.getAndAdd(-bytes); + } + + public long getPrimaryBytes() { + return primaryBytes.get(); + } + + public Releasable markReplicaOperationStarted(long bytes) { + replicaBytes.getAndAdd(bytes); + return () -> replicaBytes.getAndAdd(-bytes); + } + + public long getReplicaBytes() { + return replicaBytes.get(); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index a729019ba39..ae2849c39d8 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.delete; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; @@ -53,6 +54,8 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; public class DeleteRequest extends ReplicatedWriteRequest implements DocWriteRequest, CompositeIndicesRequest { + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(DeleteRequest.class); + private static final ShardId NO_SHARD_ID = null; // Set to null initially so we can know to override in bulk requests that have a default type. @@ -340,4 +343,9 @@ public class DeleteRequest extends ReplicatedWriteRequest public String toString() { return "delete {[" + index + "][" + type() + "][" + id + "]}"; } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id); + } } diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index c9fe775163c..0b9ba3de6ea 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.index; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.ElasticsearchGenerationException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; @@ -77,6 +78,8 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; */ public class IndexRequest extends ReplicatedWriteRequest implements DocWriteRequest, CompositeIndicesRequest { + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(IndexRequest.class); + /** * Max length of the source document to include into string() * @@ -795,4 +798,9 @@ public class IndexRequest extends ReplicatedWriteRequest implement public long getAutoGeneratedTimestamp() { return autoGeneratedTimestamp; } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id) + (source == null ? 0 : source.ramBytesUsed()); + } } diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 94dde512aa1..638371f4141 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.resync; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -45,6 +46,7 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.stream.Stream; public class TransportResyncReplicationAction extends TransportWriteAction implements PrimaryReplicaSyncer.SyncAction { @@ -54,10 +56,12 @@ public class TransportResyncReplicationAction extends TransportWriteAction> listener) { ActionListener.completeWith(listener, () -> new WritePrimaryResult<>(performOnPrimary(request), new ResyncReplicationResponse(), null, null, primary, logger)); } + @Override + protected long primaryOperationSize(ResyncReplicationRequest request) { + return Stream.of(request.getOperations()).mapToLong(Translog.Operation::estimateSize).sum(); + } + public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request) { return request; } @Override - protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest request, - IndexShard replica) throws Exception { - Translog.Location location = performOnReplica(request, replica); - return new WriteReplicaResult<>(request, location, null, replica, logger); + protected void dispatchedShardOperationOnReplica(ResyncReplicationRequest request, IndexShard replica, + ActionListener listener) { + ActionListener.completeWith(listener, () -> { + Translog.Location location = performOnReplica(request, replica); + return new WriteReplicaResult<>(request, location, null, replica, logger); + }); + } + + @Override + protected long replicaOperationSize(ResyncReplicationRequest request) { + return Stream.of(request.getOperations()).mapToLong(Translog.Operation::estimateSize).sum(); } public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception { diff --git a/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java b/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java index b93298eefe8..835546282d6 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java @@ -19,9 +19,6 @@ package org.elasticsearch.action.support; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequest; @@ -30,7 +27,6 @@ import org.elasticsearch.transport.TransportResponse; public final class ChannelActionListener< Response extends TransportResponse, Request extends TransportRequest> implements ActionListener { - private static final Logger logger = LogManager.getLogger(ChannelActionListener.class); private final TransportChannel channel; private final Request request; private final String actionName; @@ -52,12 +48,6 @@ public final class ChannelActionListener< @Override public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - e1.addSuppressed(e); - logger.warn(() -> new ParameterizedMessage( - "Failed to send error response for action [{}] and request [{}]", actionName, request), e1); - } + TransportChannel.sendErrorResponse(channel, actionName, request, e); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 6a4ab67bdae..2a4258c3817 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -214,13 +214,14 @@ public abstract class TransportReplicationAction< ActionListener> listener); /** - * Synchronously execute the specified replica operation. This is done under a permit from + * Execute the specified replica operation. This is done under a permit from * {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)}. * * @param shardRequest the request to the replica shard * @param replica the replica shard to perform the operation on */ - protected abstract ReplicaResult shardOperationOnReplica(ReplicaRequest shardRequest, IndexShard replica) throws Exception; + protected abstract void shardOperationOnReplica(ReplicaRequest shardRequest, IndexShard replica, + ActionListener listener); /** * Cluster level block to check before request execution. Returning null means that no blocks need to be checked. @@ -273,13 +274,31 @@ public abstract class TransportReplicationAction< return false; } - protected void handleOperationRequest(final Request request, final TransportChannel channel, Task task) { - execute(task, request, new ChannelActionListener<>(channel, actionName, request)); + private void handleOperationRequest(final Request request, final TransportChannel channel, Task task) { + Releasable releasable = checkOperationLimits(request); + ActionListener listener = + ActionListener.runBefore(new ChannelActionListener<>(channel, actionName, request), releasable::close); + execute(task, request, listener); + } + + protected Releasable checkOperationLimits(final Request request) { + return () -> {}; } protected void handlePrimaryRequest(final ConcreteShardRequest request, final TransportChannel channel, final Task task) { - new AsyncPrimaryAction( - request, new ChannelActionListener<>(channel, transportPrimaryAction, request), (ReplicationTask) task).run(); + Releasable releasable = checkPrimaryLimits(request.getRequest()); + ActionListener listener = + ActionListener.runBefore(new ChannelActionListener<>(channel, transportPrimaryAction, request), releasable::close); + + try { + new AsyncPrimaryAction(request, listener, (ReplicationTask) task).run(); + } catch (RuntimeException e) { + listener.onFailure(e); + } + } + + protected Releasable checkPrimaryLimits(final Request request) { + return () -> {}; } class AsyncPrimaryAction extends AbstractRunnable { @@ -490,10 +509,21 @@ public abstract class TransportReplicationAction< } } - protected void handleReplicaRequest(final ConcreteReplicaRequest replicaRequest, - final TransportChannel channel, final Task task) { - new AsyncReplicaAction( - replicaRequest, new ChannelActionListener<>(channel, transportReplicaAction, replicaRequest), (ReplicationTask) task).run(); + protected void handleReplicaRequest(final ConcreteReplicaRequest replicaRequest, final TransportChannel channel, + final Task task) { + Releasable releasable = checkReplicaLimits(replicaRequest.getRequest()); + ActionListener listener = + ActionListener.runBefore(new ChannelActionListener<>(channel, transportReplicaAction, replicaRequest), releasable::close); + + try { + new AsyncReplicaAction(replicaRequest, listener, (ReplicationTask) task).run(); + } catch (RuntimeException e) { + listener.onFailure(e); + } + } + + protected Releasable checkReplicaLimits(final ReplicaRequest request) { + return () -> {}; } public static class RetryOnReplicaException extends ElasticsearchException { @@ -532,27 +562,31 @@ public abstract class TransportReplicationAction< @Override public void onResponse(Releasable releasable) { + assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit"; try { - assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit"; - final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica); - replicaResult.runPostReplicaActions( - ActionListener.wrap(r -> { - final TransportReplicationAction.ReplicaResponse response = - new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint()); - releasable.close(); // release shard operation lock before responding to caller - if (logger.isTraceEnabled()) { - logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, - replicaRequest.getRequest().shardId(), - replicaRequest.getRequest()); - } - setPhase(task, "finished"); - onCompletionListener.onResponse(response); - }, e -> { - Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller - this.responseWithFailure(e); - }) - ); - } catch (final Exception e) { + shardOperationOnReplica(replicaRequest.getRequest(), replica, ActionListener.wrap((replicaResult) -> + replicaResult.runPostReplicaActions( + ActionListener.wrap(r -> { + final ReplicaResponse response = + new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint()); + releasable.close(); // release shard operation lock before responding to caller + if (logger.isTraceEnabled()) { + logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, + replicaRequest.getRequest().shardId(), + replicaRequest.getRequest()); + } + setPhase(task, "finished"); + onCompletionListener.onResponse(response); + }, e -> { + Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller + responseWithFailure(e); + }) + ), e -> { + Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller + AsyncReplicaAction.this.onFailure(e); + })); + // TODO: Evaludate if we still need to catch this exception + } catch (Exception e) { Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller AsyncReplicaAction.this.onFailure(e); } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 07f8e96b4e7..841f60ebb29 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -22,6 +22,8 @@ package org.elasticsearch.action.support.replication; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.action.support.WriteRequest; @@ -32,6 +34,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperParsingException; @@ -57,12 +60,45 @@ public abstract class TransportWriteAction< Response extends ReplicationResponse & WriteResponse > extends TransportReplicationAction { + private final boolean forceExecutionOnPrimary; + private final WriteMemoryLimits writeMemoryLimits; + private final String executor; + protected TransportWriteAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader request, - Writeable.Reader replicaRequest, String executor, boolean forceExecutionOnPrimary) { + Writeable.Reader replicaRequest, String executor, boolean forceExecutionOnPrimary, + WriteMemoryLimits writeMemoryLimits) { + // We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the + // ThreadPool.Names.WRITE thread pool in this class. super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - request, replicaRequest, executor, true, forceExecutionOnPrimary); + request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary); + this.executor = executor; + this.forceExecutionOnPrimary = forceExecutionOnPrimary; + this.writeMemoryLimits = writeMemoryLimits; + } + + @Override + protected Releasable checkOperationLimits(Request request) { + return writeMemoryLimits.markCoordinatingOperationStarted(primaryOperationSize(request)); + } + + @Override + protected Releasable checkPrimaryLimits(Request request) { + return writeMemoryLimits.markPrimaryOperationStarted(primaryOperationSize(request)); + } + + protected long primaryOperationSize(Request request) { + return 0; + } + + @Override + protected Releasable checkReplicaLimits(ReplicaRequest request) { + return writeMemoryLimits.markReplicaOperationStarted(replicaOperationSize(request)); + } + + protected long replicaOperationSize(ReplicaRequest request) { + return 0; } /** Syncs operation result to the translog or throws a shard not available failure */ @@ -104,18 +140,48 @@ public abstract class TransportWriteAction< * and failure async refresh is performed on the primary shard according to the Request refresh policy */ @Override - protected abstract void shardOperationOnPrimary( - Request request, IndexShard primary, ActionListener> listener); + protected void shardOperationOnPrimary( + Request request, IndexShard primary, ActionListener> listener) { + threadPool.executor(executor).execute(new ActionRunnable>(listener) { + @Override + protected void doRun() { + dispatchedShardOperationOnPrimary(request, primary, listener); + } + + @Override + public boolean isForceExecution() { + return forceExecutionOnPrimary; + } + }); + } + + protected abstract void dispatchedShardOperationOnPrimary( + Request request, IndexShard primary, ActionListener> listener); /** * Called once per replica with a reference to the replica {@linkplain IndexShard} to modify. * - * @return the result of the operation on replica, including current translog location and operation response and failure - * async refresh is performed on the replica shard according to the ReplicaRequest refresh policy + * @param listener listener for the result of the operation on replica, including current translog location and operation + * response and failure async refresh is performed on the replica shard according to the ReplicaRequest + * refresh policy */ @Override - protected abstract WriteReplicaResult shardOperationOnReplica( - ReplicaRequest request, IndexShard replica) throws Exception; + protected void shardOperationOnReplica(ReplicaRequest request, IndexShard replica, ActionListener listener) { + threadPool.executor(executor).execute(new ActionRunnable(listener) { + @Override + protected void doRun() { + dispatchedShardOperationOnReplica(request, replica, listener); + } + + @Override + public boolean isForceExecution() { + return true; + } + }); + } + + protected abstract void dispatchedShardOperationOnReplica( + ReplicaRequest request, IndexShard replica, ActionListener listener); /** * Result of taking the action on the primary. diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index c4dfc926537..7fc440c6673 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.update; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.DocWriteRequest; @@ -59,6 +60,9 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; public class UpdateRequest extends InstanceShardOperationRequest implements DocWriteRequest, WriteRequest, ToXContentObject { + + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(UpdateRequest.class); + private static ObjectParser PARSER; private static final ParseField SCRIPT_FIELD = new ParseField("script"); @@ -1010,4 +1014,16 @@ public class UpdateRequest extends InstanceShardOperationRequest res.append(", detect_noop[").append(detectNoop).append("]"); return res.append("}").toString(); } + + @Override + public long ramBytesUsed() { + long childRequestBytes = 0; + if (doc != null) { + childRequestBytes += doc.ramBytesUsed(); + } + if (upsertRequest != null) { + childRequestBytes += upsertRequest.ramBytesUsed(); + } + return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id) + childRequestBytes; + } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 785c8311d4a..9f9dc883757 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -108,9 +108,11 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction< } @Override - protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard indexShard) throws Exception { - maybeSyncTranslog(indexShard); - return new ReplicaResult(); + protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + maybeSyncTranslog(replica); + return new ReplicaResult(); + }); } private void maybeSyncTranslog(final IndexShard indexShard) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index ed0f6efaf4d..3e003fc2898 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -38,7 +38,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -165,12 +164,14 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi } @Override - protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica) throws WriteStateException { - Objects.requireNonNull(request); - Objects.requireNonNull(replica); - replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); - replica.persistRetentionLeases(); - return new ReplicaResult(); + protected void shardOperationOnReplica(Request request, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + Objects.requireNonNull(request); + Objects.requireNonNull(replica); + replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); + replica.persistRetentionLeases(); + return new ReplicaResult(); + }); } public static final class Request extends ReplicationRequest { diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index b595a679329..54a418fe673 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -25,6 +25,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteResponse; @@ -40,7 +41,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -79,7 +79,8 @@ public class RetentionLeaseSyncAction extends final IndicesService indicesService, final ThreadPool threadPool, final ShardStateAction shardStateAction, - final ActionFilters actionFilters) { + final ActionFilters actionFilters, + final WriteMemoryLimits writeMemoryLimits) { super( settings, ACTION_NAME, @@ -91,7 +92,7 @@ public class RetentionLeaseSyncAction extends actionFilters, RetentionLeaseSyncAction.Request::new, RetentionLeaseSyncAction.Request::new, - ThreadPool.Names.MANAGEMENT, false); + ThreadPool.Names.MANAGEMENT, false, writeMemoryLimits); } @Override @@ -146,7 +147,7 @@ public class RetentionLeaseSyncAction extends } @Override - protected void shardOperationOnPrimary(Request request, IndexShard primary, + protected void dispatchedShardOperationOnPrimary(Request request, IndexShard primary, ActionListener> listener) { ActionListener.completeWith(listener, () -> { assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards(); @@ -158,14 +159,15 @@ public class RetentionLeaseSyncAction extends } @Override - protected WriteReplicaResult shardOperationOnReplica( - final Request request, - final IndexShard replica) throws WriteStateException { - Objects.requireNonNull(request); - Objects.requireNonNull(replica); - replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); - replica.persistRetentionLeases(); - return new WriteReplicaResult<>(request, null, null, replica, getLogger()); + protected void dispatchedShardOperationOnReplica(Request request, IndexShard replica, + ActionListener listener) { + ActionListener.completeWith(listener, () -> { + Objects.requireNonNull(request); + Objects.requireNonNull(replica); + replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); + replica.persistRetentionLeases(); + return new WriteReplicaResult<>(request, null, null, replica, getLogger()); + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index d45be0d99e8..17c86ed436b 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -31,6 +31,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionModule; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.search.SearchExecutionStatsCollector; import org.elasticsearch.action.search.SearchPhaseController; import org.elasticsearch.action.search.SearchTransportService; @@ -593,6 +594,7 @@ public class Node implements Closeable { new PersistentTasksClusterService(settings, registry, clusterService, threadPool); resourcesToClose.add(persistentTasksClusterService); final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); + final WriteMemoryLimits bulkIndexingLimits = new WriteMemoryLimits(); modules.add(b -> { b.bind(Node.class).toInstance(this); @@ -611,6 +613,7 @@ public class Node implements Closeable { b.bind(ScriptService.class).toInstance(scriptService); b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry()); b.bind(IngestService.class).toInstance(ingestService); + b.bind(WriteMemoryLimits.class).toInstance(bulkIndexingLimits); b.bind(UsageService.class).toInstance(usageService); b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService()); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportChannel.java b/server/src/main/java/org/elasticsearch/transport/TransportChannel.java index 17e538f04c4..bdec6913115 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportChannel.java @@ -19,6 +19,9 @@ package org.elasticsearch.transport; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; import java.io.IOException; @@ -28,6 +31,8 @@ import java.io.IOException; */ public interface TransportChannel { + Logger logger = LogManager.getLogger(TransportChannel.class); + String getProfileName(); String getChannelType(); @@ -42,4 +47,17 @@ public interface TransportChannel { default Version getVersion() { return Version.CURRENT; } + + /** + * A helper method to send an exception and handle and log a subsequent exception + */ + static void sendErrorResponse(TransportChannel channel, String actionName, TransportRequest request, Exception e) { + try { + channel.sendResponse(e); + } catch (Exception sendException) { + sendException.addSuppressed(e); + logger.warn(() -> new ParameterizedMessage( + "Failed to send error response for action [{}] and request [{}]", actionName, request), sendException); + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index 164b84fa6d7..1272963a0f7 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -120,7 +120,7 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa final ExecutorService direct = EsExecutors.newDirectExecutorService(); when(threadPool.executor(anyString())).thenReturn(direct); TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService, - null, null, null, mock(ActionFilters.class), null, null) { + null, null, null, mock(ActionFilters.class), null, null, new WriteMemoryLimits()) { @Override void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, AtomicArray responses, Map indicesThatCannotBeCreated) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 728793b9517..b78340e2410 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -142,7 +142,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { new AutoCreateIndex( SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new IndexNameExpressionResolver() - ) + ), new WriteMemoryLimits() ); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 44fd32a9ccf..3aec4b3b471 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -27,7 +27,9 @@ import org.elasticsearch.action.bulk.TransportBulkActionTookTests.Resolver; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.AutoCreateIndex; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; @@ -78,7 +80,8 @@ public class TransportBulkActionTests extends ESTestCase { TestTransportBulkAction() { super(TransportBulkActionTests.this.threadPool, transportService, clusterService, null, null, null, new ActionFilters(Collections.emptySet()), new Resolver(), - new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver())); + new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver()), + new WriteMemoryLimits()); } @Override @@ -120,38 +123,36 @@ public class TransportBulkActionTests extends ESTestCase { public void testDeleteNonExistingDocDoesNotCreateIndex() throws Exception { BulkRequest bulkRequest = new BulkRequest().add(new DeleteRequest("index", "type", "id")); - bulkAction.execute(null, bulkRequest, ActionListener.wrap(response -> { - assertFalse(bulkAction.indexCreated); - BulkItemResponse[] bulkResponses = ((BulkResponse) response).getItems(); - assertEquals(bulkResponses.length, 1); - assertTrue(bulkResponses[0].isFailed()); - assertTrue(bulkResponses[0].getFailure().getCause() instanceof IndexNotFoundException); - assertEquals("index", bulkResponses[0].getFailure().getIndex()); - }, exception -> { - throw new AssertionError(exception); - })); + PlainActionFuture future = PlainActionFuture.newFuture(); + ActionTestUtils.execute(bulkAction, null, bulkRequest, future); + + BulkResponse response = future.actionGet(); + assertFalse(bulkAction.indexCreated); + BulkItemResponse[] bulkResponses = ((BulkResponse) response).getItems(); + assertEquals(bulkResponses.length, 1); + assertTrue(bulkResponses[0].isFailed()); + assertTrue(bulkResponses[0].getFailure().getCause() instanceof IndexNotFoundException); + assertEquals("index", bulkResponses[0].getFailure().getIndex()); } public void testDeleteNonExistingDocExternalVersionCreatesIndex() throws Exception { BulkRequest bulkRequest = new BulkRequest() .add(new DeleteRequest("index", "type", "id").versionType(VersionType.EXTERNAL).version(0)); - bulkAction.execute(null, bulkRequest, ActionListener.wrap(response -> { - assertTrue(bulkAction.indexCreated); - }, exception -> { - throw new AssertionError(exception); - })); + PlainActionFuture future = PlainActionFuture.newFuture(); + ActionTestUtils.execute(bulkAction, null, bulkRequest, future); + future.actionGet(); + assertTrue(bulkAction.indexCreated); } public void testDeleteNonExistingDocExternalGteVersionCreatesIndex() throws Exception { BulkRequest bulkRequest = new BulkRequest() .add(new DeleteRequest("index2", "type", "id").versionType(VersionType.EXTERNAL_GTE).version(0)); - bulkAction.execute(null, bulkRequest, ActionListener.wrap(response -> { - assertTrue(bulkAction.indexCreated); - }, exception -> { - throw new AssertionError(exception); - })); + PlainActionFuture future = PlainActionFuture.newFuture(); + ActionTestUtils.execute(bulkAction, null, bulkRequest, future); + future.actionGet(); + assertTrue(bulkAction.indexCreated); } public void testGetIndexWriteRequest() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index 5cbc4552afc..6c3e8518cf2 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -240,6 +240,7 @@ public class TransportBulkActionTookTests extends ESTestCase { actionFilters, indexNameExpressionResolver, autoCreateIndex, + new WriteMemoryLimits(), relativeTimeProvider); } diff --git a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java index 7210a35be39..5e57eb6045f 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.resync; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterState; @@ -143,7 +144,8 @@ public class TransportResyncReplicationActionTests extends ESTestCase { when(indexServices.indexServiceSafe(eq(index))).thenReturn(indexService); final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService, - clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>())); + clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()), + new WriteMemoryLimits()); assertThat(action.globalBlockLevel(), nullValue()); assertThat(action.indexBlockLevel(), nullValue()); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index fd6cacf7739..25f27d2abfb 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -921,14 +921,17 @@ public class TransportReplicationActionTests extends ESTestCase { final ReplicationTask task = maybeTask(); TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithExceptions", transportService, clusterService, shardStateAction, threadPool) { + @Override - protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) { - assertIndexShardCounter(1); - assertPhase(task, "replica"); - if (throwException) { - throw new ElasticsearchException("simulated"); - } - return new ReplicaResult(); + protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + assertIndexShardCounter(1); + assertPhase(task, "replica"); + if (throwException) { + throw new ElasticsearchException("simulated"); + } + return new ReplicaResult(); + }); } }; try { @@ -1057,12 +1060,14 @@ public class TransportReplicationActionTests extends ESTestCase { TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithExceptions", transportService, clusterService, shardStateAction, threadPool) { @Override - protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) { - assertPhase(task, "replica"); - if (throwException.get()) { - throw new RetryOnReplicaException(shardId, "simulation"); - } - return new ReplicaResult(); + protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + assertPhase(task, "replica"); + if (throwException.get()) { + throw new RetryOnReplicaException(shardId, "simulation"); + } + return new ReplicaResult(); + }); } }; final PlainActionFuture listener = new PlainActionFuture<>(); @@ -1124,13 +1129,15 @@ public class TransportReplicationActionTests extends ESTestCase { TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithExceptions", transportService, clusterService, shardStateAction, threadPool) { @Override - protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) { - assertPhase(task, "replica"); - if (throwException.get()) { - throw new RetryOnReplicaException(shardId, "simulation"); - } - calledSuccessfully.set(true); - return new ReplicaResult(); + protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + assertPhase(task, "replica"); + if (throwException.get()) { + throw new RetryOnReplicaException(shardId, "simulation"); + } + calledSuccessfully.set(true); + return new ReplicaResult(); + }); } }; final PlainActionFuture listener = new PlainActionFuture<>(); @@ -1282,9 +1289,9 @@ public class TransportReplicationActionTests extends ESTestCase { } @Override - protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) { + protected void shardOperationOnReplica(Request request, IndexShard replica, ActionListener listener) { request.processedOnReplicas.incrementAndGet(); - return new ReplicaResult(); + listener.onResponse(new ReplicaResult()); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 0c365779794..14da9b72eea 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -446,14 +446,14 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe } @Override - protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception { + protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { assertEquals("Replica is always assigned to node 2 in this test", clusterService.state().nodes().get("_node2").getId(), - shard.routingEntry().currentNodeId()); + replica.routingEntry().currentNodeId()); executedOnReplica.set(true); // The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here // that the permit has been acquired on the replica shard - assertSame(replica, shard); - return new ReplicaResult(); + assertSame(replica, replica); + listener.onResponse(new ReplicaResult()); } } @@ -505,10 +505,10 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe } @Override - protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception { + protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { assertNoBlocks("block must not exist when executing the operation on replica shard: it should have been blocked before"); - assertThat(shard.getActiveOperationsCount(), greaterThan(0)); - return super.shardOperationOnReplica(shardRequest, shard); + assertThat(replica.getActiveOperationsCount(), greaterThan(0)); + super.shardOperationOnReplica(shardRequest, replica, listener); } private void assertNoBlocks(final String error) { @@ -551,9 +551,9 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe } @Override - protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception { - assertEquals("All permits must be acquired", IndexShard.OPERATIONS_BLOCKED, shard.getActiveOperationsCount()); - return super.shardOperationOnReplica(shardRequest, shard); + protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { + assertEquals("All permits must be acquired", IndexShard.OPERATIONS_BLOCKED, replica.getActiveOperationsCount()); + super.shardOperationOnReplica(shardRequest, replica, listener); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 42a696c3b25..c1ef04b10d4 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.support.replication; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; @@ -137,7 +138,7 @@ public class TransportWriteActionTests extends ESTestCase { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit TestAction testAction = new TestAction(); - testAction.shardOperationOnPrimary(request, indexShard, + testAction.dispatchedShardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { CapturingActionListener listener = new CapturingActionListener<>(); result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful)); @@ -152,8 +153,9 @@ public class TransportWriteActionTests extends ESTestCase { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit TestAction testAction = new TestAction(); - TransportWriteAction.WriteReplicaResult result = - testAction.shardOperationOnReplica(request, indexShard); + final PlainActionFuture future = PlainActionFuture.newFuture(); + testAction.dispatchedShardOperationOnReplica(request, indexShard, future); + final TransportReplicationAction.ReplicaResult result = future.actionGet(); CapturingActionListener listener = new CapturingActionListener<>(); result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE)); assertNotNull(listener.response); @@ -166,7 +168,7 @@ public class TransportWriteActionTests extends ESTestCase { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.IMMEDIATE); TestAction testAction = new TestAction(); - testAction.shardOperationOnPrimary(request, indexShard, + testAction.dispatchedShardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { CapturingActionListener listener = new CapturingActionListener<>(); result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful)); @@ -182,8 +184,9 @@ public class TransportWriteActionTests extends ESTestCase { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.IMMEDIATE); TestAction testAction = new TestAction(); - TransportWriteAction.WriteReplicaResult result = - testAction.shardOperationOnReplica(request, indexShard); + final PlainActionFuture future = PlainActionFuture.newFuture(); + testAction.dispatchedShardOperationOnReplica(request, indexShard, future); + final TransportReplicationAction.ReplicaResult result = future.actionGet(); CapturingActionListener listener = new CapturingActionListener<>(); result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE)); assertNotNull(listener.response); @@ -197,7 +200,7 @@ public class TransportWriteActionTests extends ESTestCase { request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); TestAction testAction = new TestAction(); - testAction.shardOperationOnPrimary(request, indexShard, + testAction.dispatchedShardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { CapturingActionListener listener = new CapturingActionListener<>(); result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful)); @@ -221,7 +224,9 @@ public class TransportWriteActionTests extends ESTestCase { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); TestAction testAction = new TestAction(); - TransportWriteAction.WriteReplicaResult result = testAction.shardOperationOnReplica(request, indexShard); + final PlainActionFuture future = PlainActionFuture.newFuture(); + testAction.dispatchedShardOperationOnReplica(request, indexShard, future); + final TransportReplicationAction.ReplicaResult result = future.actionGet(); CapturingActionListener listener = new CapturingActionListener<>(); result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE)); assertNull(listener.response); // Haven't responded yet @@ -240,7 +245,7 @@ public class TransportWriteActionTests extends ESTestCase { public void testDocumentFailureInShardOperationOnPrimary() throws Exception { TestRequest request = new TestRequest(); TestAction testAction = new TestAction(true, true); - testAction.shardOperationOnPrimary(request, indexShard, + testAction.dispatchedShardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { CapturingActionListener listener = new CapturingActionListener<>(); result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful)); @@ -252,8 +257,9 @@ public class TransportWriteActionTests extends ESTestCase { public void testDocumentFailureInShardOperationOnReplica() throws Exception { TestRequest request = new TestRequest(); TestAction testAction = new TestAction(randomBoolean(), true); - TransportWriteAction.WriteReplicaResult result = - testAction.shardOperationOnReplica(request, indexShard); + final PlainActionFuture future = PlainActionFuture.newFuture(); + testAction.dispatchedShardOperationOnReplica(request, indexShard, future); + final TransportReplicationAction.ReplicaResult result = future.actionGet(); CapturingActionListener listener = new CapturingActionListener<>(); result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE)); assertNull(listener.response); @@ -360,7 +366,8 @@ public class TransportWriteActionTests extends ESTestCase { super(Settings.EMPTY, "internal:test", new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null, - new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false); + new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false, + new WriteMemoryLimits()); this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary; this.withDocumentFailureOnReplica = withDocumentFailureOnReplica; } @@ -369,7 +376,8 @@ public class TransportWriteActionTests extends ESTestCase { ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool) { super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, shardStateAction, - new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false); + new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false, + new WriteMemoryLimits()); this.withDocumentFailureOnPrimary = false; this.withDocumentFailureOnReplica = false; } @@ -381,7 +389,7 @@ public class TransportWriteActionTests extends ESTestCase { } @Override - protected void shardOperationOnPrimary( + protected void dispatchedShardOperationOnPrimary( TestRequest request, IndexShard primary, ActionListener> listener) { ActionListener.completeWith(listener, () -> { if (withDocumentFailureOnPrimary) { @@ -393,14 +401,16 @@ public class TransportWriteActionTests extends ESTestCase { } @Override - protected WriteReplicaResult shardOperationOnReplica(TestRequest request, IndexShard replica) throws Exception { - final WriteReplicaResult replicaResult; - if (withDocumentFailureOnReplica) { - replicaResult = new WriteReplicaResult<>(request, null, new RuntimeException("simulated"), replica, logger); - } else { - replicaResult = new WriteReplicaResult<>(request, location, null, replica, logger); - } - return replicaResult; + protected void dispatchedShardOperationOnReplica(TestRequest request, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + final WriteReplicaResult replicaResult; + if (withDocumentFailureOnReplica) { + replicaResult = new WriteReplicaResult<>(request, null, new RuntimeException("simulated"), replica, logger); + } else { + replicaResult = new WriteReplicaResult<>(request, location, null, replica, logger); + } + return replicaResult; + }); } } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java index c22011ac71b..bc44ae0c1d1 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java @@ -116,7 +116,8 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase { if (randomBoolean()) { action.shardOperationOnPrimary(primaryRequest, indexShard, ActionTestUtils.assertNoFailureListener(r -> {})); } else { - action.shardOperationOnReplica(new GlobalCheckpointSyncAction.Request(indexShard.shardId()), indexShard); + action.shardOperationOnReplica(new GlobalCheckpointSyncAction.Request(indexShard.shardId()), indexShard, + ActionTestUtils.assertNoFailureListener(r -> {})); } if (durability == Translog.Durability.ASYNC || lastSyncedGlobalCheckpoint == globalCheckpoint) { diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java index e380d3431b3..0d8607ab01c 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.service.ClusterService; @@ -149,7 +150,9 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase { final RetentionLeaseBackgroundSyncAction.Request request = new RetentionLeaseBackgroundSyncAction.Request(indexShard.shardId(), retentionLeases); - final TransportReplicationAction.ReplicaResult result = action.shardOperationOnReplica(request, indexShard); + final PlainActionFuture listener = PlainActionFuture.newFuture(); + action.shardOperationOnReplica(request, indexShard, listener); + final TransportReplicationAction.ReplicaResult result = listener.actionGet(); // the retention leases on the shard should be updated verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases); // the retention leases on the shard should be persisted diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index 4578cdd770f..6c77999191f 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -20,9 +20,11 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; -import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -102,10 +104,11 @@ public class RetentionLeaseSyncActionTests extends ESTestCase { indicesService, threadPool, shardStateAction, - new ActionFilters(Collections.emptySet())); + new ActionFilters(Collections.emptySet()), + new WriteMemoryLimits()); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); - action.shardOperationOnPrimary(request, indexShard, + action.dispatchedShardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { // the retention leases on the shard should be persisted verify(indexShard).persistRetentionLeases(); @@ -138,12 +141,14 @@ public class RetentionLeaseSyncActionTests extends ESTestCase { indicesService, threadPool, shardStateAction, - new ActionFilters(Collections.emptySet())); + new ActionFilters(Collections.emptySet()), + new WriteMemoryLimits()); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); - final TransportWriteAction.WriteReplicaResult result = - action.shardOperationOnReplica(request, indexShard); + PlainActionFuture listener = PlainActionFuture.newFuture(); + action.dispatchedShardOperationOnReplica(request, indexShard, listener); + final TransportReplicationAction.ReplicaResult result = listener.actionGet(); // the retention leases on the shard should be updated verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases); // the retention leases on the shard should be persisted @@ -176,7 +181,8 @@ public class RetentionLeaseSyncActionTests extends ESTestCase { indicesService, threadPool, shardStateAction, - new ActionFilters(Collections.emptySet())); + new ActionFilters(Collections.emptySet()), + new WriteMemoryLimits()); assertNull(action.indexBlockLevel()); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 6a1f0d36484..cdbc15b4448 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -65,6 +65,7 @@ import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAct import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction; import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction; import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.TransportBulkAction; @@ -1484,7 +1485,8 @@ public class SnapshotResiliencyTests extends ESTestCase { indicesService, threadPool, shardStateAction, - actionFilters)), + actionFilters, + new WriteMemoryLimits())), new GlobalCheckpointSyncAction( settings, transportService, @@ -1510,7 +1512,7 @@ public class SnapshotResiliencyTests extends ESTestCase { mappingUpdatedAction.setClient(client); final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService, clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService), - actionFilters); + actionFilters, new WriteMemoryLimits()); actions.put(BulkAction.INSTANCE, new TransportBulkAction(threadPool, transportService, clusterService, new IngestService( @@ -1518,7 +1520,7 @@ public class SnapshotResiliencyTests extends ESTestCase { new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(), Collections.emptyList(), client), transportShardBulkAction, client, actionFilters, indexNameExpressionResolver, - new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver) + new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver), new WriteMemoryLimits() )); final RestoreService restoreService = new RestoreService( clusterService, repositoriesService, allocationService, diff --git a/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java b/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java index 55c1acabd3f..98d92ae4bd7 100644 --- a/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.tasks.Task; import static org.elasticsearch.action.support.PlainActionFuture.newFuture; @@ -37,6 +38,16 @@ public class ActionTestUtils { return future.actionGet(); } + /** + * Executes the given action. + * + * This is a shim method to make execution publicly available in tests. + */ + public static + void execute(TransportAction action, Task task, Request request, ActionListener listener) { + action.execute(task, request, listener); + } + public static ActionListener assertNoFailureListener(CheckedConsumer consumer) { return ActionListener.wrap(consumer, e -> { throw new AssertionError(e); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index af5b3b20eb3..f860a28eea2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -9,6 +9,7 @@ package org.elasticsearch.xpack.ccr.action.bulk; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -43,7 +44,8 @@ public class TransportBulkShardOperationsAction final IndicesService indicesService, final ThreadPool threadPool, final ShardStateAction shardStateAction, - final ActionFilters actionFilters) { + final ActionFilters actionFilters, + final WriteMemoryLimits writeMemoryLimits) { super( settings, BulkShardOperationsAction.NAME, @@ -55,11 +57,11 @@ public class TransportBulkShardOperationsAction actionFilters, BulkShardOperationsRequest::new, BulkShardOperationsRequest::new, - ThreadPool.Names.WRITE, false); + ThreadPool.Names.WRITE, false, writeMemoryLimits); } @Override - protected void shardOperationOnPrimary(BulkShardOperationsRequest request, IndexShard primary, + protected void dispatchedShardOperationOnPrimary(BulkShardOperationsRequest request, IndexShard primary, ActionListener> listener) { if (logger.isTraceEnabled()) { logger.trace("index [{}] on the following primary shard {}", request.getOperations(), primary.routingEntry()); @@ -68,6 +70,11 @@ public class TransportBulkShardOperationsAction request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger)); } + @Override + protected long primaryOperationSize(BulkShardOperationsRequest request) { + return request.getOperations().stream().mapToLong(Translog.Operation::estimateSize).sum(); + } + public static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) { final Translog.Operation operationWithPrimaryTerm; switch (operation.opType()) { @@ -160,12 +167,19 @@ public class TransportBulkShardOperationsAction } @Override - protected WriteReplicaResult shardOperationOnReplica( - final BulkShardOperationsRequest request, final IndexShard replica) throws Exception { - if (logger.isTraceEnabled()) { - logger.trace("index [{}] on the following replica shard {}", request.getOperations(), replica.routingEntry()); - } - return shardOperationOnReplica(request, replica, logger); + protected void dispatchedShardOperationOnReplica(BulkShardOperationsRequest request, IndexShard replica, + ActionListener listener) { + ActionListener.completeWith(listener, () -> { + if (logger.isTraceEnabled()) { + logger.trace("index [{}] on the following replica shard {}", request.getOperations(), replica.routingEntry()); + } + return shardOperationOnReplica(request, replica, logger); + }); + } + + @Override + protected long replicaOperationSize(BulkShardOperationsRequest request) { + return request.getOperations().stream().mapToLong(Translog.Operation::estimateSize).sum(); } // public for testing purposes only