From 4a51ea8c8e5cc69244192305d6236516a17689d7 Mon Sep 17 00:00:00 2001 From: Ali Beyad Date: Tue, 19 Jul 2016 14:49:48 -0400 Subject: [PATCH] Before, transport replication actions implemented a checkWriteConsistency() method to determine if a write consistency check should be performed before proceeding with the action. This commit removes this method from the transport replication actions in favor of setting the ActiveShardCount on the request, with setting the value to ActiveShardCount.NONE if the transport action's checkWriteConsistency() method returned false. --- .../admin/indices/flush/ShardFlushRequest.java | 2 ++ .../indices/flush/TransportShardFlushAction.java | 5 ----- .../refresh/TransportShardRefreshAction.java | 15 +++++++++------ .../support/replication/ReplicationOperation.java | 7 ++----- .../support/replication/ReplicationRequest.java | 2 +- .../replication/TransportReplicationAction.java | 9 +-------- .../replication/ReplicationOperationTests.java | 11 ++++++----- .../TransportReplicationActionTests.java | 9 +++------ .../ESIndexLevelReplicationTestCase.java | 5 +++-- 9 files changed, 27 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java index 3a9ec89db5d..83eaf11ca3a 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.flush; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -33,6 +34,7 @@ public class ShardFlushRequest extends ReplicationRequest { public ShardFlushRequest(FlushRequest request, ShardId shardId) { super(shardId); this.request = request; + this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default } public ShardFlushRequest() { diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index a6213c4f925..570307a717d 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -69,11 +69,6 @@ public class TransportShardFlushAction extends TransportReplicationAction { public static final String NAME = RefreshAction.NAME + "[s]"; + private static final Supplier requestSupplier = () -> { + BasicReplicationRequest req = new BasicReplicationRequest(); + req.waitForActiveShards(ActiveShardCount.NONE); + return req; + }; @Inject public TransportShardRefreshAction(Settings settings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, BasicReplicationRequest::new, BasicReplicationRequest::new, ThreadPool.Names.REFRESH); + indexNameExpressionResolver, requestSupplier, requestSupplier, ThreadPool.Names.REFRESH); } @Override @@ -70,11 +78,6 @@ public class TransportShardRefreshAction return new ReplicaResult(); } - @Override - protected boolean checkActiveShardCount() { - return false; - } - @Override protected ClusterBlockLevel globalBlockLevel() { return ClusterBlockLevel.METADATA_WRITE; diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index f5d3e3f3c31..3577ca0c95c 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -68,7 +68,6 @@ public class ReplicationOperation< private final AtomicInteger pendingShards = new AtomicInteger(); private final AtomicInteger successfulShards = new AtomicInteger(); private final boolean executeOnReplicas; - private final boolean checkActiveShardCount; private final Primary primary; private final Replicas replicasProxy; private final AtomicBoolean finished = new AtomicBoolean(); @@ -80,10 +79,8 @@ public class ReplicationOperation< public ReplicationOperation(Request request, Primary primary, ActionListener listener, - boolean executeOnReplicas, boolean checkActiveShardCount, - Replicas replicas, + boolean executeOnReplicas, Replicas replicas, Supplier clusterStateSupplier, ESLogger logger, String opType) { - this.checkActiveShardCount = checkActiveShardCount; this.executeOnReplicas = executeOnReplicas; this.replicasProxy = replicas; this.primary = primary; @@ -95,7 +92,7 @@ public class ReplicationOperation< } public void execute() throws Exception { - final String activeShardCountFailure = checkActiveShardCount ? checkActiveShardCount() : null; + final String activeShardCountFailure = checkActiveShardCount(); final ShardRouting primaryRouting = primary.routingEntry(); final ShardId primaryId = primaryRouting.shardId(); if (activeShardCountFailure != null) { diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index bd63db20ecb..314e55f2b81 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -63,7 +63,7 @@ public abstract class ReplicationRequest listener, PrimaryShardReference primaryShardReference, boolean executeOnReplicas) { return new ReplicationOperation<>(request, primaryShardReference, listener, - executeOnReplicas, checkActiveShardCount(), replicasProxy, clusterService::state, logger, actionName + executeOnReplicas, replicasProxy, clusterService::state, logger, actionName ); } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index c1482030248..6d4d4d404f8 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -136,7 +136,7 @@ public class ReplicationOperationTests extends ESTestCase { Request request = new Request(shardId); PlainActionFuture listener = new PlainActionFuture<>(); final TestReplicationOperation op = new TestReplicationOperation(request, - new TestPrimary(primaryShard, primaryTerm), listener, false, false, + new TestPrimary(primaryShard, primaryTerm), listener, false, new TestReplicaProxy(), () -> state, logger, "test"); op.execute(); assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); @@ -281,7 +281,7 @@ public class ReplicationOperationTests extends ESTestCase { final ShardRouting primaryShard = shardRoutingTable.primaryShard(); final TestReplicationOperation op = new TestReplicationOperation(request, new TestPrimary(primaryShard, primaryTerm), - listener, randomBoolean(), true, new TestReplicaProxy(), () -> state, logger, "test"); + listener, randomBoolean(), new TestReplicaProxy(), () -> state, logger, "test"); if (passesActiveShardCheck) { assertThat(op.checkActiveShardCount(), nullValue()); @@ -329,6 +329,7 @@ public class ReplicationOperationTests extends ESTestCase { this(); this.shardId = shardId; this.index = shardId.getIndexName(); + this.waitForActiveShards = ActiveShardCount.NONE; // keep things simple } @@ -440,13 +441,13 @@ public class ReplicationOperationTests extends ESTestCase { class TestReplicationOperation extends ReplicationOperation { public TestReplicationOperation(Request request, Primary primary, ActionListener listener, Replicas replicas, Supplier clusterStateSupplier) { - this(request, primary, listener, true, false, replicas, clusterStateSupplier, ReplicationOperationTests.this.logger, "test"); + this(request, primary, listener, true, replicas, clusterStateSupplier, ReplicationOperationTests.this.logger, "test"); } public TestReplicationOperation(Request request, Primary primary, - ActionListener listener, boolean executeOnReplicas, boolean checkActiveShardCount, + ActionListener listener, boolean executeOnReplicas, Replicas replicas, Supplier clusterStateSupplier, ESLogger logger, String opType) { - super(request, primary, listener, executeOnReplicas, checkActiveShardCount, replicas, clusterStateSupplier, logger, opType); + super(request, primary, listener, executeOnReplicas, replicas, clusterStateSupplier, logger, opType); } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index ef8ee623939..f0eb8d0de92 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.ClusterState; @@ -719,6 +720,7 @@ public class TransportReplicationActionTests extends ESTestCase { this(); this.shardId = shardId; this.index = shardId.getIndexName(); + this.waitForActiveShards = ActiveShardCount.NONE; // keep things simple } @@ -765,11 +767,6 @@ public class TransportReplicationActionTests extends ESTestCase { return new ReplicaResult(); } - @Override - protected boolean checkActiveShardCount() { - return false; - } - @Override protected boolean resolveIndex() { return false; @@ -815,7 +812,7 @@ public class TransportReplicationActionTests extends ESTestCase { class NoopReplicationOperation extends ReplicationOperation { public NoopReplicationOperation(Request request, ActionListener listener) { - super(request, null, listener, true, true, null, null, TransportReplicationActionTests.this.logger, "noop"); + super(request, null, listener, true, null, null, TransportReplicationActionTests.this.logger, "noop"); } @Override diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index d561536d2b9..552e378be71 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -33,6 +33,7 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.TransportIndexAction; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -251,7 +252,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { public int indexDocs(final int numOfDoc) throws Exception { for (int doc = 0; doc < numOfDoc; doc++) { final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", Integer.toString(docId.incrementAndGet())) - .source("{}"); + .source("{}").waitForActiveShards(ActiveShardCount.NONE); final IndexResponse response = index(indexRequest); assertEquals(DocWriteResponse.Result.CREATED, response.getResult()); } @@ -398,7 +399,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { private final ReplicationGroup replicationGroup; public IndexingOp(IndexRequest request, ActionListener listener, ReplicationGroup replicationGroup) { - super(request, new PrimaryRef(replicationGroup), listener, true, false, new ReplicasRef(replicationGroup), + super(request, new PrimaryRef(replicationGroup), listener, true, new ReplicasRef(replicationGroup), () -> null, logger, "indexing"); this.replicationGroup = replicationGroup; request.process(null, true, request.index());