Add memory tracking to queued write operations (#58957)

Currently we do not track the memory consuming by in-process write
operations.

This commit adds a mechanism to track write operation memory usage.
This commit is contained in:
Tim Brooks 2020-07-02 14:14:57 -06:00 committed by GitHub
parent a4e08acdd1
commit 1ef2cd7f1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 761 additions and 211 deletions

View File

@ -0,0 +1,206 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Stream;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.greaterThan;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2)
public class WriteMemoryLimitsIT extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
// Need at least two threads because we are going to block one
.put("thread_pool.write.size", 2)
.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class);
}
@Override
protected int numberOfReplicas() {
return 1;
}
@Override
protected int numberOfShards() {
return 1;
}
public void testWriteBytesAreIncremented() throws Exception {
final String index = "test";
assertAcked(prepareCreate(index, Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)));
ensureGreen(index);
IndicesStatsResponse response = client().admin().indices().prepareStats(index).get();
String primaryId = Stream.of(response.getShards())
.map(ShardStats::getShardRouting)
.filter(ShardRouting::primary)
.findAny()
.get()
.currentNodeId();
String replicaId = Stream.of(response.getShards())
.map(ShardStats::getShardRouting)
.filter(sr -> sr.primary() == false)
.findAny()
.get()
.currentNodeId();
String primaryName = client().admin().cluster().prepareState().get().getState().nodes().get(primaryId).getName();
String replicaName = client().admin().cluster().prepareState().get().getState().nodes().get(replicaId).getName();
final CountDownLatch replicationSendPointReached = new CountDownLatch(1);
final CountDownLatch latchBlockingReplicationSend = new CountDownLatch(1);
final CountDownLatch newActionsSendPointReached = new CountDownLatch(2);
final CountDownLatch latchBlockingReplication = new CountDownLatch(1);
TransportService primaryService = internalCluster().getInstance(TransportService.class, primaryName);
final MockTransportService primaryTransportService = (MockTransportService) primaryService;
TransportService replicaService = internalCluster().getInstance(TransportService.class, replicaName);
final MockTransportService replicaTransportService = (MockTransportService) replicaService;
primaryTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(TransportShardBulkAction.ACTION_NAME + "[r]")) {
try {
replicationSendPointReached.countDown();
latchBlockingReplicationSend.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
connection.sendRequest(requestId, action, request, options);
});
final BulkRequest bulkRequest = new BulkRequest();
int totalRequestSize = 0;
for (int i = 0; i < 80; ++i) {
IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID())
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
totalRequestSize += request.ramBytesUsed();
assertTrue(request.ramBytesUsed() > request.source().length());
bulkRequest.add(request);
}
final long bulkRequestSize = bulkRequest.ramBytesUsed();
final long bulkShardRequestSize = totalRequestSize;
try {
final ActionFuture<BulkResponse> successFuture = client(replicaName).bulk(bulkRequest);
replicationSendPointReached.await();
WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName);
WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName);
assertThat(primaryWriteLimits.getCoordinatingBytes(), greaterThan(bulkShardRequestSize));
assertThat(primaryWriteLimits.getPrimaryBytes(), greaterThan(bulkShardRequestSize));
assertEquals(0, primaryWriteLimits.getReplicaBytes());
assertEquals(bulkRequestSize, replicaWriteLimits.getCoordinatingBytes());
assertEquals(0, replicaWriteLimits.getPrimaryBytes());
assertEquals(0, replicaWriteLimits.getReplicaBytes());
ThreadPool replicaThreadPool = replicaTransportService.getThreadPool();
// Block the replica Write thread pool
replicaThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {
try {
newActionsSendPointReached.countDown();
latchBlockingReplication.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
});
replicaThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {
try {
newActionsSendPointReached.countDown();
latchBlockingReplication.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
});
newActionsSendPointReached.await();
latchBlockingReplicationSend.countDown();
IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID())
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
final BulkRequest secondBulkRequest = new BulkRequest();
secondBulkRequest.add(request);
ActionFuture<BulkResponse> secondFuture = client(replicaName).bulk(secondBulkRequest);
final long secondBulkRequestSize = secondBulkRequest.ramBytesUsed();
final long secondBulkShardRequestSize = request.ramBytesUsed();
assertEquals(bulkRequestSize + secondBulkRequestSize, replicaWriteLimits.getCoordinatingBytes());
assertBusy(() -> assertThat(replicaWriteLimits.getReplicaBytes(),
greaterThan(bulkShardRequestSize + secondBulkShardRequestSize)));
latchBlockingReplication.countDown();
successFuture.actionGet();
secondFuture.actionGet();
assertEquals(0, primaryWriteLimits.getCoordinatingBytes());
assertEquals(0, primaryWriteLimits.getPrimaryBytes());
assertEquals(0, primaryWriteLimits.getReplicaBytes());
assertEquals(0, replicaWriteLimits.getCoordinatingBytes());
assertEquals(0, replicaWriteLimits.getPrimaryBytes());
assertEquals(0, replicaWriteLimits.getReplicaBytes());
} finally {
if (replicationSendPointReached.getCount() > 0) {
replicationSendPointReached.countDown();
}
while (newActionsSendPointReached.getCount() > 0) {
newActionsSendPointReached.countDown();
}
if (latchBlockingReplicationSend.getCount() > 0) {
latchBlockingReplicationSend.countDown();
}
if (latchBlockingReplication.getCount() > 0) {
latchBlockingReplication.countDown();
}
primaryTransportService.clearAllRules();
}
}
}

View File

@ -124,8 +124,8 @@ public class TransportReplicationActionRetryOnClosedNodeIT extends ESIntegTestCa
}
@Override
protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard replica) {
return new ReplicaResult();
protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
listener.onResponse(new ReplicaResult());
}
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.action;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
@ -40,7 +41,7 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
* Generic interface to group ActionRequest, which perform writes to a single document
* Action requests implementing this can be part of {@link org.elasticsearch.action.bulk.BulkRequest}
*/
public interface DocWriteRequest<T> extends IndicesRequest {
public interface DocWriteRequest<T> extends IndicesRequest, Accountable {
/**
* Set the index for this request

View File

@ -94,9 +94,11 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
}
@Override
protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws IOException {
executeShardOperation(shardRequest, replica);
return new ReplicaResult();
protected void shardOperationOnReplica(ShardRequest shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
executeShardOperation(shardRequest, replica);
return new ReplicaResult();
});
}
private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) throws IOException {

View File

@ -64,9 +64,11 @@ public class TransportShardFlushAction
}
@Override
protected ReplicaResult shardOperationOnReplica(ShardFlushRequest request, IndexShard replica) {
replica.flush(request.getRequest());
logger.trace("{} flush request executed on replica", replica.shardId());
return new ReplicaResult();
protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
replica.flush(request.getRequest());
logger.trace("{} flush request executed on replica", replica.shardId());
return new ReplicaResult();
});
}
}

View File

@ -100,9 +100,11 @@ public class TransportVerifyShardIndexBlockAction extends TransportReplicationAc
}
@Override
protected ReplicaResult shardOperationOnReplica(ShardRequest shardRequest, IndexShard replica) {
executeShardOperation(shardRequest, replica);
return new ReplicaResult();
protected void shardOperationOnReplica(ShardRequest shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
executeShardOperation(shardRequest, replica);
return new ReplicaResult();
});
}
private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) {

View File

@ -66,9 +66,12 @@ public class TransportShardRefreshAction
}
@Override
protected ReplicaResult shardOperationOnReplica(BasicReplicationRequest request, IndexShard replica) {
replica.refresh("api");
logger.trace("{} refresh request executed on replica", replica.shardId());
return new ReplicaResult();
protected void shardOperationOnReplica(BasicReplicationRequest request, IndexShard replica,
ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
replica.refresh("api");
logger.trace("{} refresh request executed on replica", replica.shardId());
return new ReplicaResult();
});
}
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.action.bulk;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
@ -30,7 +32,9 @@ import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.Objects;
public class BulkItemRequest implements Writeable {
public class BulkItemRequest implements Writeable, Accountable {
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkItemRequest.class);
private int id;
private DocWriteRequest<?> request;
@ -115,4 +119,9 @@ public class BulkItemRequest implements Writeable {
DocWriteRequest.writeDocumentRequestThin(out, request);
out.writeOptionalWriteable(primaryResponse == null ? null : primaryResponse::writeThin);
}
@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + request.ramBytesUsed();
}
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.action.bulk;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
@ -56,7 +58,9 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
* Note that we only support refresh on the bulk request not per item.
* @see org.elasticsearch.client.Client#bulk(BulkRequest)
*/
public class BulkRequest extends ActionRequest implements CompositeIndicesRequest, WriteRequest<BulkRequest> {
public class BulkRequest extends ActionRequest implements CompositeIndicesRequest, WriteRequest<BulkRequest>, Accountable {
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkRequest.class);
private static final int REQUEST_OVERHEAD = 50;
@ -429,4 +433,9 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
}
return value;
}
@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + requests.stream().mapToLong(Accountable::ramBytesUsed).sum();
}
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.action.bulk;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
@ -29,12 +31,14 @@ import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Stream;
public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> {
public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> implements Accountable {
public static final Version COMPACT_SHARD_ID_VERSION = Version.V_7_9_0;
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkShardRequest.class);
private BulkItemRequest[] items;
private final BulkItemRequest[] items;
public BulkShardRequest(StreamInput in) throws IOException {
super(in);
@ -143,4 +147,9 @@ public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> {
}
}
}
@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + Stream.of(items).mapToLong(Accountable::ramBytesUsed).sum();
}
}

View File

@ -59,6 +59,7 @@ import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -112,23 +113,24 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final NodeClient client;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private static final String DROPPED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated";
private final WriteMemoryLimits writeMemoryLimits;
@Inject
public TransportBulkAction(ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, IngestService ingestService,
TransportShardBulkAction shardBulkAction, NodeClient client,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex) {
AutoCreateIndex autoCreateIndex, WriteMemoryLimits writeMemoryLimits) {
this(threadPool, transportService, clusterService, ingestService, shardBulkAction, client, actionFilters,
indexNameExpressionResolver, autoCreateIndex, System::nanoTime);
indexNameExpressionResolver, autoCreateIndex, writeMemoryLimits, System::nanoTime);
}
public TransportBulkAction(ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, IngestService ingestService,
TransportShardBulkAction shardBulkAction, NodeClient client,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex, LongSupplier relativeTimeProvider) {
super(BulkAction.NAME, transportService, actionFilters, BulkRequest::new, ThreadPool.Names.WRITE);
AutoCreateIndex autoCreateIndex, WriteMemoryLimits writeMemoryLimits, LongSupplier relativeTimeProvider) {
super(BulkAction.NAME, transportService, actionFilters, BulkRequest::new, ThreadPool.Names.SAME);
Objects.requireNonNull(relativeTimeProvider);
this.threadPool = threadPool;
this.clusterService = clusterService;
@ -139,6 +141,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
this.ingestForwarder = new IngestActionForwarder(transportService);
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.writeMemoryLimits = writeMemoryLimits;
clusterService.addStateApplier(this.ingestForwarder);
}
@ -149,7 +152,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
* @param docWriteRequest The request to find the {@link IndexRequest}
* @return the found {@link IndexRequest} or {@code null} if one can not be found.
*/
public static IndexRequest getIndexWriteRequest(DocWriteRequest docWriteRequest) {
public static IndexRequest getIndexWriteRequest(DocWriteRequest<?> docWriteRequest) {
IndexRequest indexRequest = null;
if (docWriteRequest instanceof IndexRequest) {
indexRequest = (IndexRequest) docWriteRequest;
@ -162,6 +165,17 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
long indexingBytes = bulkRequest.ramBytesUsed();
final Releasable releasable = writeMemoryLimits.markCoordinatingOperationStarted(indexingBytes);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
try {
doInternalExecute(task, bulkRequest, releasingListener);
} catch (Exception e) {
releasingListener.onFailure(e);
}
}
protected void doInternalExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
final long startTime = relativeTime();
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
@ -749,7 +763,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
// before we continue the bulk request we should fork back on a write thread:
if (originalThread == Thread.currentThread()) {
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE);
doExecute(task, bulkRequest, actionListener);
doInternalExecute(task, bulkRequest, actionListener);
} else {
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
@Override
@ -759,7 +773,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
@Override
protected void doRun() throws Exception {
doExecute(task, bulkRequest, actionListener);
doInternalExecute(task, bulkRequest, actionListener);
}
@Override

View File

@ -90,9 +90,10 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
@Inject
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters) {
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
WriteMemoryLimits writeMemoryLimits) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false, writeMemoryLimits);
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
}
@ -108,7 +109,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
@Override
protected void shardOperationOnPrimary(BulkShardRequest request, IndexShard primary,
protected void dispatchedShardOperationOnPrimary(BulkShardRequest request, IndexShard primary,
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis,
@ -136,6 +137,11 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
);
}
@Override
protected long primaryOperationSize(BulkShardRequest request) {
return request.ramBytesUsed();
}
public static void performOnPrimary(
BulkShardRequest request,
IndexShard primary,
@ -404,9 +410,16 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
@Override
public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
final Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, location, null, replica, logger);
protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
final Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, location, null, replica, logger);
});
}
@Override
protected long replicaOperationSize(BulkShardRequest request) {
return request.ramBytesUsed();
}
public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.common.lease.Releasable;
import java.util.concurrent.atomic.AtomicLong;
public class WriteMemoryLimits {
private final AtomicLong coordinatingBytes = new AtomicLong(0);
private final AtomicLong primaryBytes = new AtomicLong(0);
private final AtomicLong replicaBytes = new AtomicLong(0);
public Releasable markCoordinatingOperationStarted(long bytes) {
coordinatingBytes.addAndGet(bytes);
return () -> coordinatingBytes.getAndAdd(-bytes);
}
public long getCoordinatingBytes() {
return coordinatingBytes.get();
}
public Releasable markPrimaryOperationStarted(long bytes) {
primaryBytes.addAndGet(bytes);
return () -> primaryBytes.getAndAdd(-bytes);
}
public long getPrimaryBytes() {
return primaryBytes.get();
}
public Releasable markReplicaOperationStarted(long bytes) {
replicaBytes.getAndAdd(bytes);
return () -> replicaBytes.getAndAdd(-bytes);
}
public long getReplicaBytes() {
return replicaBytes.get();
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.delete;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
@ -53,6 +54,8 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
implements DocWriteRequest<DeleteRequest>, CompositeIndicesRequest {
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(DeleteRequest.class);
private static final ShardId NO_SHARD_ID = null;
// Set to null initially so we can know to override in bulk requests that have a default type.
@ -340,4 +343,9 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
public String toString() {
return "delete {[" + index + "][" + type() + "][" + id + "]}";
}
@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id);
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.index;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
@ -77,6 +78,8 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
*/
public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implements DocWriteRequest<IndexRequest>, CompositeIndicesRequest {
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(IndexRequest.class);
/**
* Max length of the source document to include into string()
*
@ -795,4 +798,9 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
public long getAutoGeneratedTimestamp() {
return autoGeneratedTimestamp;
}
@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id) + (source == null ? 0 : source.ramBytesUsed());
}
}

View File

@ -20,6 +20,7 @@ package org.elasticsearch.action.resync;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.WriteMemoryLimits;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationResponse;
@ -45,6 +46,7 @@ import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.stream.Stream;
public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {
@ -54,10 +56,12 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
@Inject
public TransportResyncReplicationAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters) {
ShardStateAction shardStateAction, ActionFilters actionFilters,
WriteMemoryLimits writeMemoryLimits) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
true /* we should never reject resync because of thread pool capacity on primary */);
true, /* we should never reject resync because of thread pool capacity on primary */
writeMemoryLimits);
}
@Override
@ -83,21 +87,33 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
}
@Override
protected void shardOperationOnPrimary(ResyncReplicationRequest request, IndexShard primary,
protected void dispatchedShardOperationOnPrimary(ResyncReplicationRequest request, IndexShard primary,
ActionListener<PrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse>> listener) {
ActionListener.completeWith(listener,
() -> new WritePrimaryResult<>(performOnPrimary(request), new ResyncReplicationResponse(), null, null, primary, logger));
}
@Override
protected long primaryOperationSize(ResyncReplicationRequest request) {
return Stream.of(request.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
}
public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request) {
return request;
}
@Override
protected WriteReplicaResult<ResyncReplicationRequest> shardOperationOnReplica(ResyncReplicationRequest request,
IndexShard replica) throws Exception {
Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, location, null, replica, logger);
protected void dispatchedShardOperationOnReplica(ResyncReplicationRequest request, IndexShard replica,
ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, location, null, replica, logger);
});
}
@Override
protected long replicaOperationSize(ResyncReplicationRequest request) {
return Stream.of(request.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
}
public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {

View File

@ -19,9 +19,6 @@
package org.elasticsearch.action.support;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
@ -30,7 +27,6 @@ import org.elasticsearch.transport.TransportResponse;
public final class ChannelActionListener<
Response extends TransportResponse, Request extends TransportRequest> implements ActionListener<Response> {
private static final Logger logger = LogManager.getLogger(ChannelActionListener.class);
private final TransportChannel channel;
private final Request request;
private final String actionName;
@ -52,12 +48,6 @@ public final class ChannelActionListener<
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
e1.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage(
"Failed to send error response for action [{}] and request [{}]", actionName, request), e1);
}
TransportChannel.sendErrorResponse(channel, actionName, request, e);
}
}

View File

@ -214,13 +214,14 @@ public abstract class TransportReplicationAction<
ActionListener<PrimaryResult<ReplicaRequest, Response>> listener);
/**
* Synchronously execute the specified replica operation. This is done under a permit from
* Execute the specified replica operation. This is done under a permit from
* {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)}.
*
* @param shardRequest the request to the replica shard
* @param replica the replica shard to perform the operation on
*/
protected abstract ReplicaResult shardOperationOnReplica(ReplicaRequest shardRequest, IndexShard replica) throws Exception;
protected abstract void shardOperationOnReplica(ReplicaRequest shardRequest, IndexShard replica,
ActionListener<ReplicaResult> listener);
/**
* Cluster level block to check before request execution. Returning null means that no blocks need to be checked.
@ -273,13 +274,31 @@ public abstract class TransportReplicationAction<
return false;
}
protected void handleOperationRequest(final Request request, final TransportChannel channel, Task task) {
execute(task, request, new ChannelActionListener<>(channel, actionName, request));
private void handleOperationRequest(final Request request, final TransportChannel channel, Task task) {
Releasable releasable = checkOperationLimits(request);
ActionListener<Response> listener =
ActionListener.runBefore(new ChannelActionListener<>(channel, actionName, request), releasable::close);
execute(task, request, listener);
}
protected Releasable checkOperationLimits(final Request request) {
return () -> {};
}
protected void handlePrimaryRequest(final ConcreteShardRequest<Request> request, final TransportChannel channel, final Task task) {
new AsyncPrimaryAction(
request, new ChannelActionListener<>(channel, transportPrimaryAction, request), (ReplicationTask) task).run();
Releasable releasable = checkPrimaryLimits(request.getRequest());
ActionListener<Response> listener =
ActionListener.runBefore(new ChannelActionListener<>(channel, transportPrimaryAction, request), releasable::close);
try {
new AsyncPrimaryAction(request, listener, (ReplicationTask) task).run();
} catch (RuntimeException e) {
listener.onFailure(e);
}
}
protected Releasable checkPrimaryLimits(final Request request) {
return () -> {};
}
class AsyncPrimaryAction extends AbstractRunnable {
@ -490,10 +509,21 @@ public abstract class TransportReplicationAction<
}
}
protected void handleReplicaRequest(final ConcreteReplicaRequest<ReplicaRequest> replicaRequest,
final TransportChannel channel, final Task task) {
new AsyncReplicaAction(
replicaRequest, new ChannelActionListener<>(channel, transportReplicaAction, replicaRequest), (ReplicationTask) task).run();
protected void handleReplicaRequest(final ConcreteReplicaRequest<ReplicaRequest> replicaRequest, final TransportChannel channel,
final Task task) {
Releasable releasable = checkReplicaLimits(replicaRequest.getRequest());
ActionListener<ReplicaResponse> listener =
ActionListener.runBefore(new ChannelActionListener<>(channel, transportReplicaAction, replicaRequest), releasable::close);
try {
new AsyncReplicaAction(replicaRequest, listener, (ReplicationTask) task).run();
} catch (RuntimeException e) {
listener.onFailure(e);
}
}
protected Releasable checkReplicaLimits(final ReplicaRequest request) {
return () -> {};
}
public static class RetryOnReplicaException extends ElasticsearchException {
@ -532,27 +562,31 @@ public abstract class TransportReplicationAction<
@Override
public void onResponse(Releasable releasable) {
assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit";
try {
assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit";
final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica);
replicaResult.runPostReplicaActions(
ActionListener.wrap(r -> {
final TransportReplicationAction.ReplicaResponse response =
new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint());
releasable.close(); // release shard operation lock before responding to caller
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction,
replicaRequest.getRequest().shardId(),
replicaRequest.getRequest());
}
setPhase(task, "finished");
onCompletionListener.onResponse(response);
}, e -> {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
this.responseWithFailure(e);
})
);
} catch (final Exception e) {
shardOperationOnReplica(replicaRequest.getRequest(), replica, ActionListener.wrap((replicaResult) ->
replicaResult.runPostReplicaActions(
ActionListener.wrap(r -> {
final ReplicaResponse response =
new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint());
releasable.close(); // release shard operation lock before responding to caller
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction,
replicaRequest.getRequest().shardId(),
replicaRequest.getRequest());
}
setPhase(task, "finished");
onCompletionListener.onResponse(response);
}, e -> {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
responseWithFailure(e);
})
), e -> {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
AsyncReplicaAction.this.onFailure(e);
}));
// TODO: Evaludate if we still need to catch this exception
} catch (Exception e) {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
AsyncReplicaAction.this.onFailure(e);
}

View File

@ -22,6 +22,8 @@ package org.elasticsearch.action.support.replication;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.bulk.WriteMemoryLimits;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.WriteRequest;
@ -32,6 +34,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperParsingException;
@ -57,12 +60,45 @@ public abstract class TransportWriteAction<
Response extends ReplicationResponse & WriteResponse
> extends TransportReplicationAction<Request, ReplicaRequest, Response> {
private final boolean forceExecutionOnPrimary;
private final WriteMemoryLimits writeMemoryLimits;
private final String executor;
protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader<Request> request,
Writeable.Reader<ReplicaRequest> replicaRequest, String executor, boolean forceExecutionOnPrimary) {
Writeable.Reader<ReplicaRequest> replicaRequest, String executor, boolean forceExecutionOnPrimary,
WriteMemoryLimits writeMemoryLimits) {
// We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the
// ThreadPool.Names.WRITE thread pool in this class.
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
request, replicaRequest, executor, true, forceExecutionOnPrimary);
request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary);
this.executor = executor;
this.forceExecutionOnPrimary = forceExecutionOnPrimary;
this.writeMemoryLimits = writeMemoryLimits;
}
@Override
protected Releasable checkOperationLimits(Request request) {
return writeMemoryLimits.markCoordinatingOperationStarted(primaryOperationSize(request));
}
@Override
protected Releasable checkPrimaryLimits(Request request) {
return writeMemoryLimits.markPrimaryOperationStarted(primaryOperationSize(request));
}
protected long primaryOperationSize(Request request) {
return 0;
}
@Override
protected Releasable checkReplicaLimits(ReplicaRequest request) {
return writeMemoryLimits.markReplicaOperationStarted(replicaOperationSize(request));
}
protected long replicaOperationSize(ReplicaRequest request) {
return 0;
}
/** Syncs operation result to the translog or throws a shard not available failure */
@ -104,18 +140,48 @@ public abstract class TransportWriteAction<
* and failure async refresh is performed on the <code>primary</code> shard according to the <code>Request</code> refresh policy
*/
@Override
protected abstract void shardOperationOnPrimary(
Request request, IndexShard primary, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener);
protected void shardOperationOnPrimary(
Request request, IndexShard primary, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener) {
threadPool.executor(executor).execute(new ActionRunnable<PrimaryResult<ReplicaRequest, Response>>(listener) {
@Override
protected void doRun() {
dispatchedShardOperationOnPrimary(request, primary, listener);
}
@Override
public boolean isForceExecution() {
return forceExecutionOnPrimary;
}
});
}
protected abstract void dispatchedShardOperationOnPrimary(
Request request, IndexShard primary, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener);
/**
* Called once per replica with a reference to the replica {@linkplain IndexShard} to modify.
*
* @return the result of the operation on replica, including current translog location and operation response and failure
* async refresh is performed on the <code>replica</code> shard according to the <code>ReplicaRequest</code> refresh policy
* @param listener listener for the result of the operation on replica, including current translog location and operation
* response and failure async refresh is performed on the <code>replica</code> shard according to the <code>ReplicaRequest</code>
* refresh policy
*/
@Override
protected abstract WriteReplicaResult<ReplicaRequest> shardOperationOnReplica(
ReplicaRequest request, IndexShard replica) throws Exception;
protected void shardOperationOnReplica(ReplicaRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
threadPool.executor(executor).execute(new ActionRunnable<ReplicaResult>(listener) {
@Override
protected void doRun() {
dispatchedShardOperationOnReplica(request, replica, listener);
}
@Override
public boolean isForceExecution() {
return true;
}
});
}
protected abstract void dispatchedShardOperationOnReplica(
ReplicaRequest request, IndexShard replica, ActionListener<ReplicaResult> listener);
/**
* Result of taking the action on the primary.

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.update;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocWriteRequest;
@ -59,6 +60,9 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
implements DocWriteRequest<UpdateRequest>, WriteRequest<UpdateRequest>, ToXContentObject {
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(UpdateRequest.class);
private static ObjectParser<UpdateRequest, Void> PARSER;
private static final ParseField SCRIPT_FIELD = new ParseField("script");
@ -1010,4 +1014,16 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
res.append(", detect_noop[").append(detectNoop).append("]");
return res.append("}").toString();
}
@Override
public long ramBytesUsed() {
long childRequestBytes = 0;
if (doc != null) {
childRequestBytes += doc.ramBytesUsed();
}
if (upsertRequest != null) {
childRequestBytes += upsertRequest.ramBytesUsed();
}
return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id) + childRequestBytes;
}
}

View File

@ -108,9 +108,11 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
}
@Override
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard indexShard) throws Exception {
maybeSyncTranslog(indexShard);
return new ReplicaResult();
protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
maybeSyncTranslog(replica);
return new ReplicaResult();
});
}
private void maybeSyncTranslog(final IndexShard indexShard) throws IOException {

View File

@ -38,7 +38,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
@ -165,12 +164,14 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
}
@Override
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica) throws WriteStateException {
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
replica.persistRetentionLeases();
return new ReplicaResult();
protected void shardOperationOnReplica(Request request, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
replica.persistRetentionLeases();
return new ReplicaResult();
});
}
public static final class Request extends ReplicationRequest<Request> {

View File

@ -25,6 +25,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.WriteMemoryLimits;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteResponse;
@ -40,7 +41,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
@ -79,7 +79,8 @@ public class RetentionLeaseSyncAction extends
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters) {
final ActionFilters actionFilters,
final WriteMemoryLimits writeMemoryLimits) {
super(
settings,
ACTION_NAME,
@ -91,7 +92,7 @@ public class RetentionLeaseSyncAction extends
actionFilters,
RetentionLeaseSyncAction.Request::new,
RetentionLeaseSyncAction.Request::new,
ThreadPool.Names.MANAGEMENT, false);
ThreadPool.Names.MANAGEMENT, false, writeMemoryLimits);
}
@Override
@ -146,7 +147,7 @@ public class RetentionLeaseSyncAction extends
}
@Override
protected void shardOperationOnPrimary(Request request, IndexShard primary,
protected void dispatchedShardOperationOnPrimary(Request request, IndexShard primary,
ActionListener<PrimaryResult<Request, Response>> listener) {
ActionListener.completeWith(listener, () -> {
assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards();
@ -158,14 +159,15 @@ public class RetentionLeaseSyncAction extends
}
@Override
protected WriteReplicaResult<Request> shardOperationOnReplica(
final Request request,
final IndexShard replica) throws WriteStateException {
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
replica.persistRetentionLeases();
return new WriteReplicaResult<>(request, null, null, replica, getLogger());
protected void dispatchedShardOperationOnReplica(Request request, IndexShard replica,
ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
replica.persistRetentionLeases();
return new WriteReplicaResult<>(request, null, null, replica, getLogger());
});
}
@Override

View File

@ -31,6 +31,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus;
import org.elasticsearch.action.bulk.WriteMemoryLimits;
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
import org.elasticsearch.action.search.SearchPhaseController;
import org.elasticsearch.action.search.SearchTransportService;
@ -593,6 +594,7 @@ public class Node implements Closeable {
new PersistentTasksClusterService(settings, registry, clusterService, threadPool);
resourcesToClose.add(persistentTasksClusterService);
final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
final WriteMemoryLimits bulkIndexingLimits = new WriteMemoryLimits();
modules.add(b -> {
b.bind(Node.class).toInstance(this);
@ -611,6 +613,7 @@ public class Node implements Closeable {
b.bind(ScriptService.class).toInstance(scriptService);
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
b.bind(IngestService.class).toInstance(ingestService);
b.bind(WriteMemoryLimits.class).toInstance(bulkIndexingLimits);
b.bind(UsageService.class).toInstance(usageService);
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);

View File

@ -19,6 +19,9 @@
package org.elasticsearch.transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import java.io.IOException;
@ -28,6 +31,8 @@ import java.io.IOException;
*/
public interface TransportChannel {
Logger logger = LogManager.getLogger(TransportChannel.class);
String getProfileName();
String getChannelType();
@ -42,4 +47,17 @@ public interface TransportChannel {
default Version getVersion() {
return Version.CURRENT;
}
/**
* A helper method to send an exception and handle and log a subsequent exception
*/
static void sendErrorResponse(TransportChannel channel, String actionName, TransportRequest request, Exception e) {
try {
channel.sendResponse(e);
} catch (Exception sendException) {
sendException.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage(
"Failed to send error response for action [{}] and request [{}]", actionName, request), sendException);
}
}
}

View File

@ -120,7 +120,7 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa
final ExecutorService direct = EsExecutors.newDirectExecutorService();
when(threadPool.executor(anyString())).thenReturn(direct);
TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService,
null, null, null, mock(ActionFilters.class), null, null) {
null, null, null, mock(ActionFilters.class), null, null, new WriteMemoryLimits()) {
@Override
void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,
AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {

View File

@ -142,7 +142,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
new AutoCreateIndex(
SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new IndexNameExpressionResolver()
)
), new WriteMemoryLimits()
);
}
@Override

View File

@ -27,7 +27,9 @@ import org.elasticsearch.action.bulk.TransportBulkActionTookTests.Resolver;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetadata;
@ -78,7 +80,8 @@ public class TransportBulkActionTests extends ESTestCase {
TestTransportBulkAction() {
super(TransportBulkActionTests.this.threadPool, transportService, clusterService, null, null,
null, new ActionFilters(Collections.emptySet()), new Resolver(),
new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver()));
new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver()),
new WriteMemoryLimits());
}
@Override
@ -120,38 +123,36 @@ public class TransportBulkActionTests extends ESTestCase {
public void testDeleteNonExistingDocDoesNotCreateIndex() throws Exception {
BulkRequest bulkRequest = new BulkRequest().add(new DeleteRequest("index", "type", "id"));
bulkAction.execute(null, bulkRequest, ActionListener.wrap(response -> {
assertFalse(bulkAction.indexCreated);
BulkItemResponse[] bulkResponses = ((BulkResponse) response).getItems();
assertEquals(bulkResponses.length, 1);
assertTrue(bulkResponses[0].isFailed());
assertTrue(bulkResponses[0].getFailure().getCause() instanceof IndexNotFoundException);
assertEquals("index", bulkResponses[0].getFailure().getIndex());
}, exception -> {
throw new AssertionError(exception);
}));
PlainActionFuture<BulkResponse> future = PlainActionFuture.newFuture();
ActionTestUtils.execute(bulkAction, null, bulkRequest, future);
BulkResponse response = future.actionGet();
assertFalse(bulkAction.indexCreated);
BulkItemResponse[] bulkResponses = ((BulkResponse) response).getItems();
assertEquals(bulkResponses.length, 1);
assertTrue(bulkResponses[0].isFailed());
assertTrue(bulkResponses[0].getFailure().getCause() instanceof IndexNotFoundException);
assertEquals("index", bulkResponses[0].getFailure().getIndex());
}
public void testDeleteNonExistingDocExternalVersionCreatesIndex() throws Exception {
BulkRequest bulkRequest = new BulkRequest()
.add(new DeleteRequest("index", "type", "id").versionType(VersionType.EXTERNAL).version(0));
bulkAction.execute(null, bulkRequest, ActionListener.wrap(response -> {
assertTrue(bulkAction.indexCreated);
}, exception -> {
throw new AssertionError(exception);
}));
PlainActionFuture<BulkResponse> future = PlainActionFuture.newFuture();
ActionTestUtils.execute(bulkAction, null, bulkRequest, future);
future.actionGet();
assertTrue(bulkAction.indexCreated);
}
public void testDeleteNonExistingDocExternalGteVersionCreatesIndex() throws Exception {
BulkRequest bulkRequest = new BulkRequest()
.add(new DeleteRequest("index2", "type", "id").versionType(VersionType.EXTERNAL_GTE).version(0));
bulkAction.execute(null, bulkRequest, ActionListener.wrap(response -> {
assertTrue(bulkAction.indexCreated);
}, exception -> {
throw new AssertionError(exception);
}));
PlainActionFuture<BulkResponse> future = PlainActionFuture.newFuture();
ActionTestUtils.execute(bulkAction, null, bulkRequest, future);
future.actionGet();
assertTrue(bulkAction.indexCreated);
}
public void testGetIndexWriteRequest() throws Exception {

View File

@ -240,6 +240,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
actionFilters,
indexNameExpressionResolver,
autoCreateIndex,
new WriteMemoryLimits(),
relativeTimeProvider);
}

View File

@ -20,6 +20,7 @@ package org.elasticsearch.action.resync;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.WriteMemoryLimits;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
@ -143,7 +144,8 @@ public class TransportResyncReplicationActionTests extends ESTestCase {
when(indexServices.indexServiceSafe(eq(index))).thenReturn(indexService);
final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService,
clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()));
clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()),
new WriteMemoryLimits());
assertThat(action.globalBlockLevel(), nullValue());
assertThat(action.indexBlockLevel(), nullValue());

View File

@ -921,14 +921,17 @@ public class TransportReplicationActionTests extends ESTestCase {
final ReplicationTask task = maybeTask();
TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithExceptions", transportService, clusterService,
shardStateAction, threadPool) {
@Override
protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) {
assertIndexShardCounter(1);
assertPhase(task, "replica");
if (throwException) {
throw new ElasticsearchException("simulated");
}
return new ReplicaResult();
protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
assertIndexShardCounter(1);
assertPhase(task, "replica");
if (throwException) {
throw new ElasticsearchException("simulated");
}
return new ReplicaResult();
});
}
};
try {
@ -1057,12 +1060,14 @@ public class TransportReplicationActionTests extends ESTestCase {
TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithExceptions", transportService, clusterService,
shardStateAction, threadPool) {
@Override
protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) {
assertPhase(task, "replica");
if (throwException.get()) {
throw new RetryOnReplicaException(shardId, "simulation");
}
return new ReplicaResult();
protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
assertPhase(task, "replica");
if (throwException.get()) {
throw new RetryOnReplicaException(shardId, "simulation");
}
return new ReplicaResult();
});
}
};
final PlainActionFuture<TransportResponse> listener = new PlainActionFuture<>();
@ -1124,13 +1129,15 @@ public class TransportReplicationActionTests extends ESTestCase {
TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithExceptions", transportService, clusterService,
shardStateAction, threadPool) {
@Override
protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) {
assertPhase(task, "replica");
if (throwException.get()) {
throw new RetryOnReplicaException(shardId, "simulation");
}
calledSuccessfully.set(true);
return new ReplicaResult();
protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
assertPhase(task, "replica");
if (throwException.get()) {
throw new RetryOnReplicaException(shardId, "simulation");
}
calledSuccessfully.set(true);
return new ReplicaResult();
});
}
};
final PlainActionFuture<TransportResponse> listener = new PlainActionFuture<>();
@ -1282,9 +1289,9 @@ public class TransportReplicationActionTests extends ESTestCase {
}
@Override
protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) {
protected void shardOperationOnReplica(Request request, IndexShard replica, ActionListener<ReplicaResult> listener) {
request.processedOnReplicas.incrementAndGet();
return new ReplicaResult();
listener.onResponse(new ReplicaResult());
}
}

View File

@ -446,14 +446,14 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
}
@Override
protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception {
protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
assertEquals("Replica is always assigned to node 2 in this test", clusterService.state().nodes().get("_node2").getId(),
shard.routingEntry().currentNodeId());
replica.routingEntry().currentNodeId());
executedOnReplica.set(true);
// The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here
// that the permit has been acquired on the replica shard
assertSame(replica, shard);
return new ReplicaResult();
assertSame(replica, replica);
listener.onResponse(new ReplicaResult());
}
}
@ -505,10 +505,10 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
}
@Override
protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception {
protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
assertNoBlocks("block must not exist when executing the operation on replica shard: it should have been blocked before");
assertThat(shard.getActiveOperationsCount(), greaterThan(0));
return super.shardOperationOnReplica(shardRequest, shard);
assertThat(replica.getActiveOperationsCount(), greaterThan(0));
super.shardOperationOnReplica(shardRequest, replica, listener);
}
private void assertNoBlocks(final String error) {
@ -551,9 +551,9 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
}
@Override
protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception {
assertEquals("All permits must be acquired", IndexShard.OPERATIONS_BLOCKED, shard.getActiveOperationsCount());
return super.shardOperationOnReplica(shardRequest, shard);
protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
assertEquals("All permits must be acquired", IndexShard.OPERATIONS_BLOCKED, replica.getActiveOperationsCount());
super.shardOperationOnReplica(shardRequest, replica, listener);
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.support.replication;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.WriteMemoryLimits;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.PlainActionFuture;
@ -137,7 +138,7 @@ public class TransportWriteActionTests extends ESTestCase {
TestRequest request = new TestRequest();
request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit
TestAction testAction = new TestAction();
testAction.shardOperationOnPrimary(request, indexShard,
testAction.dispatchedShardOperationOnPrimary(request, indexShard,
ActionTestUtils.assertNoFailureListener(result -> {
CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful));
@ -152,8 +153,9 @@ public class TransportWriteActionTests extends ESTestCase {
TestRequest request = new TestRequest();
request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit
TestAction testAction = new TestAction();
TransportWriteAction.WriteReplicaResult<TestRequest> result =
testAction.shardOperationOnReplica(request, indexShard);
final PlainActionFuture<TransportReplicationAction.ReplicaResult> future = PlainActionFuture.newFuture();
testAction.dispatchedShardOperationOnReplica(request, indexShard, future);
final TransportReplicationAction.ReplicaResult result = future.actionGet();
CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE));
assertNotNull(listener.response);
@ -166,7 +168,7 @@ public class TransportWriteActionTests extends ESTestCase {
TestRequest request = new TestRequest();
request.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
TestAction testAction = new TestAction();
testAction.shardOperationOnPrimary(request, indexShard,
testAction.dispatchedShardOperationOnPrimary(request, indexShard,
ActionTestUtils.assertNoFailureListener(result -> {
CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful));
@ -182,8 +184,9 @@ public class TransportWriteActionTests extends ESTestCase {
TestRequest request = new TestRequest();
request.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
TestAction testAction = new TestAction();
TransportWriteAction.WriteReplicaResult<TestRequest> result =
testAction.shardOperationOnReplica(request, indexShard);
final PlainActionFuture<TransportReplicationAction.ReplicaResult> future = PlainActionFuture.newFuture();
testAction.dispatchedShardOperationOnReplica(request, indexShard, future);
final TransportReplicationAction.ReplicaResult result = future.actionGet();
CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE));
assertNotNull(listener.response);
@ -197,7 +200,7 @@ public class TransportWriteActionTests extends ESTestCase {
request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
TestAction testAction = new TestAction();
testAction.shardOperationOnPrimary(request, indexShard,
testAction.dispatchedShardOperationOnPrimary(request, indexShard,
ActionTestUtils.assertNoFailureListener(result -> {
CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful));
@ -221,7 +224,9 @@ public class TransportWriteActionTests extends ESTestCase {
TestRequest request = new TestRequest();
request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
TestAction testAction = new TestAction();
TransportWriteAction.WriteReplicaResult<TestRequest> result = testAction.shardOperationOnReplica(request, indexShard);
final PlainActionFuture<TransportReplicationAction.ReplicaResult> future = PlainActionFuture.newFuture();
testAction.dispatchedShardOperationOnReplica(request, indexShard, future);
final TransportReplicationAction.ReplicaResult result = future.actionGet();
CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE));
assertNull(listener.response); // Haven't responded yet
@ -240,7 +245,7 @@ public class TransportWriteActionTests extends ESTestCase {
public void testDocumentFailureInShardOperationOnPrimary() throws Exception {
TestRequest request = new TestRequest();
TestAction testAction = new TestAction(true, true);
testAction.shardOperationOnPrimary(request, indexShard,
testAction.dispatchedShardOperationOnPrimary(request, indexShard,
ActionTestUtils.assertNoFailureListener(result -> {
CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful));
@ -252,8 +257,9 @@ public class TransportWriteActionTests extends ESTestCase {
public void testDocumentFailureInShardOperationOnReplica() throws Exception {
TestRequest request = new TestRequest();
TestAction testAction = new TestAction(randomBoolean(), true);
TransportWriteAction.WriteReplicaResult<TestRequest> result =
testAction.shardOperationOnReplica(request, indexShard);
final PlainActionFuture<TransportReplicationAction.ReplicaResult> future = PlainActionFuture.newFuture();
testAction.dispatchedShardOperationOnReplica(request, indexShard, future);
final TransportReplicationAction.ReplicaResult result = future.actionGet();
CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE));
assertNull(listener.response);
@ -360,7 +366,8 @@ public class TransportWriteActionTests extends ESTestCase {
super(Settings.EMPTY, "internal:test",
new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null,
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false);
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false,
new WriteMemoryLimits());
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
}
@ -369,7 +376,8 @@ public class TransportWriteActionTests extends ESTestCase {
ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService,
mockIndicesService(clusterService), threadPool, shardStateAction,
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false);
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false,
new WriteMemoryLimits());
this.withDocumentFailureOnPrimary = false;
this.withDocumentFailureOnReplica = false;
}
@ -381,7 +389,7 @@ public class TransportWriteActionTests extends ESTestCase {
}
@Override
protected void shardOperationOnPrimary(
protected void dispatchedShardOperationOnPrimary(
TestRequest request, IndexShard primary, ActionListener<PrimaryResult<TestRequest, TestResponse>> listener) {
ActionListener.completeWith(listener, () -> {
if (withDocumentFailureOnPrimary) {
@ -393,14 +401,16 @@ public class TransportWriteActionTests extends ESTestCase {
}
@Override
protected WriteReplicaResult<TestRequest> shardOperationOnReplica(TestRequest request, IndexShard replica) throws Exception {
final WriteReplicaResult<TestRequest> replicaResult;
if (withDocumentFailureOnReplica) {
replicaResult = new WriteReplicaResult<>(request, null, new RuntimeException("simulated"), replica, logger);
} else {
replicaResult = new WriteReplicaResult<>(request, location, null, replica, logger);
}
return replicaResult;
protected void dispatchedShardOperationOnReplica(TestRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
final WriteReplicaResult<TestRequest> replicaResult;
if (withDocumentFailureOnReplica) {
replicaResult = new WriteReplicaResult<>(request, null, new RuntimeException("simulated"), replica, logger);
} else {
replicaResult = new WriteReplicaResult<>(request, location, null, replica, logger);
}
return replicaResult;
});
}
}

View File

@ -116,7 +116,8 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase {
if (randomBoolean()) {
action.shardOperationOnPrimary(primaryRequest, indexShard, ActionTestUtils.assertNoFailureListener(r -> {}));
} else {
action.shardOperationOnReplica(new GlobalCheckpointSyncAction.Request(indexShard.shardId()), indexShard);
action.shardOperationOnReplica(new GlobalCheckpointSyncAction.Request(indexShard.shardId()), indexShard,
ActionTestUtils.assertNoFailureListener(r -> {}));
}
if (durability == Translog.Durability.ASYNC || lastSyncedGlobalCheckpoint == globalCheckpoint) {

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.service.ClusterService;
@ -149,7 +150,9 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
final RetentionLeaseBackgroundSyncAction.Request request =
new RetentionLeaseBackgroundSyncAction.Request(indexShard.shardId(), retentionLeases);
final TransportReplicationAction.ReplicaResult result = action.shardOperationOnReplica(request, indexShard);
final PlainActionFuture<TransportReplicationAction.ReplicaResult> listener = PlainActionFuture.newFuture();
action.shardOperationOnReplica(request, indexShard, listener);
final TransportReplicationAction.ReplicaResult result = listener.actionGet();
// the retention leases on the shard should be updated
verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases);
// the retention leases on the shard should be persisted

View File

@ -20,9 +20,11 @@
package org.elasticsearch.index.seqno;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.WriteMemoryLimits;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
@ -102,10 +104,11 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()));
new ActionFilters(Collections.emptySet()),
new WriteMemoryLimits());
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
action.shardOperationOnPrimary(request, indexShard,
action.dispatchedShardOperationOnPrimary(request, indexShard,
ActionTestUtils.assertNoFailureListener(result -> {
// the retention leases on the shard should be persisted
verify(indexShard).persistRetentionLeases();
@ -138,12 +141,14 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()));
new ActionFilters(Collections.emptySet()),
new WriteMemoryLimits());
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
final TransportWriteAction.WriteReplicaResult<RetentionLeaseSyncAction.Request> result =
action.shardOperationOnReplica(request, indexShard);
PlainActionFuture<TransportReplicationAction.ReplicaResult> listener = PlainActionFuture.newFuture();
action.dispatchedShardOperationOnReplica(request, indexShard, listener);
final TransportReplicationAction.ReplicaResult result = listener.actionGet();
// the retention leases on the shard should be updated
verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases);
// the retention leases on the shard should be persisted
@ -176,7 +181,8 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()));
new ActionFilters(Collections.emptySet()),
new WriteMemoryLimits());
assertNull(action.indexBlockLevel());
}

View File

@ -65,6 +65,7 @@ import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAct
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.WriteMemoryLimits;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.TransportBulkAction;
@ -1484,7 +1485,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
indicesService,
threadPool,
shardStateAction,
actionFilters)),
actionFilters,
new WriteMemoryLimits())),
new GlobalCheckpointSyncAction(
settings,
transportService,
@ -1510,7 +1512,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
mappingUpdatedAction.setClient(client);
final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService,
clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService),
actionFilters);
actionFilters, new WriteMemoryLimits());
actions.put(BulkAction.INSTANCE,
new TransportBulkAction(threadPool, transportService, clusterService,
new IngestService(
@ -1518,7 +1520,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(),
Collections.emptyList(), client),
transportShardBulkAction, client, actionFilters, indexNameExpressionResolver,
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver)
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver), new WriteMemoryLimits()
));
final RestoreService restoreService = new RestoreService(
clusterService, repositoriesService, allocationService,

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.tasks.Task;
import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
@ -37,6 +38,16 @@ public class ActionTestUtils {
return future.actionGet();
}
/**
* Executes the given action.
*
* This is a shim method to make execution publicly available in tests.
*/
public static <Request extends ActionRequest, Response extends ActionResponse>
void execute(TransportAction<Request, Response> action, Task task, Request request, ActionListener<Response> listener) {
action.execute(task, request, listener);
}
public static <T> ActionListener<T> assertNoFailureListener(CheckedConsumer<T, Exception> consumer) {
return ActionListener.wrap(consumer, e -> {
throw new AssertionError(e);

View File

@ -9,6 +9,7 @@ package org.elasticsearch.xpack.ccr.action.bulk;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.WriteMemoryLimits;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
@ -43,7 +44,8 @@ public class TransportBulkShardOperationsAction
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters) {
final ActionFilters actionFilters,
final WriteMemoryLimits writeMemoryLimits) {
super(
settings,
BulkShardOperationsAction.NAME,
@ -55,11 +57,11 @@ public class TransportBulkShardOperationsAction
actionFilters,
BulkShardOperationsRequest::new,
BulkShardOperationsRequest::new,
ThreadPool.Names.WRITE, false);
ThreadPool.Names.WRITE, false, writeMemoryLimits);
}
@Override
protected void shardOperationOnPrimary(BulkShardOperationsRequest request, IndexShard primary,
protected void dispatchedShardOperationOnPrimary(BulkShardOperationsRequest request, IndexShard primary,
ActionListener<PrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse>> listener) {
if (logger.isTraceEnabled()) {
logger.trace("index [{}] on the following primary shard {}", request.getOperations(), primary.routingEntry());
@ -68,6 +70,11 @@ public class TransportBulkShardOperationsAction
request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger));
}
@Override
protected long primaryOperationSize(BulkShardOperationsRequest request) {
return request.getOperations().stream().mapToLong(Translog.Operation::estimateSize).sum();
}
public static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) {
final Translog.Operation operationWithPrimaryTerm;
switch (operation.opType()) {
@ -160,12 +167,19 @@ public class TransportBulkShardOperationsAction
}
@Override
protected WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica(
final BulkShardOperationsRequest request, final IndexShard replica) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("index [{}] on the following replica shard {}", request.getOperations(), replica.routingEntry());
}
return shardOperationOnReplica(request, replica, logger);
protected void dispatchedShardOperationOnReplica(BulkShardOperationsRequest request, IndexShard replica,
ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
if (logger.isTraceEnabled()) {
logger.trace("index [{}] on the following replica shard {}", request.getOperations(), replica.routingEntry());
}
return shardOperationOnReplica(request, replica, logger);
});
}
@Override
protected long replicaOperationSize(BulkShardOperationsRequest request) {
return request.getOperations().stream().mapToLong(Translog.Operation::estimateSize).sum();
}
// public for testing purposes only