Before, transport replication actions implemented a checkWriteConsistency()

method to determine if a write consistency check should be performed
before proceeding with the action.  This commit removes this method from
the transport replication actions in favor of setting the ActiveShardCount
on the request, with setting the value to ActiveShardCount.NONE if the
transport action's checkWriteConsistency() method returned false.
This commit is contained in:
Ali Beyad 2016-07-19 14:49:48 -04:00
parent d93f7d6085
commit 4a51ea8c8e
9 changed files with 27 additions and 38 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.indices.flush;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -33,6 +34,7 @@ public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {
public ShardFlushRequest(FlushRequest request, ShardId shardId) {
super(shardId);
this.request = request;
this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default
}
public ShardFlushRequest() {

View File

@ -69,11 +69,6 @@ public class TransportShardFlushAction extends TransportReplicationAction<ShardF
return new ReplicaResult();
}
@Override
protected boolean checkActiveShardCount() {
return false;
}
@Override
protected ClusterBlockLevel globalBlockLevel() {
return ClusterBlockLevel.METADATA_WRITE;

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.refresh;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
@ -35,17 +36,24 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.function.Supplier;
public class TransportShardRefreshAction
extends TransportReplicationAction<BasicReplicationRequest, BasicReplicationRequest, ReplicationResponse> {
public static final String NAME = RefreshAction.NAME + "[s]";
private static final Supplier<BasicReplicationRequest> requestSupplier = () -> {
BasicReplicationRequest req = new BasicReplicationRequest();
req.waitForActiveShards(ActiveShardCount.NONE);
return req;
};
@Inject
public TransportShardRefreshAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, BasicReplicationRequest::new, BasicReplicationRequest::new, ThreadPool.Names.REFRESH);
indexNameExpressionResolver, requestSupplier, requestSupplier, ThreadPool.Names.REFRESH);
}
@Override
@ -70,11 +78,6 @@ public class TransportShardRefreshAction
return new ReplicaResult();
}
@Override
protected boolean checkActiveShardCount() {
return false;
}
@Override
protected ClusterBlockLevel globalBlockLevel() {
return ClusterBlockLevel.METADATA_WRITE;

View File

@ -68,7 +68,6 @@ public class ReplicationOperation<
private final AtomicInteger pendingShards = new AtomicInteger();
private final AtomicInteger successfulShards = new AtomicInteger();
private final boolean executeOnReplicas;
private final boolean checkActiveShardCount;
private final Primary<Request, ReplicaRequest, PrimaryResultT> primary;
private final Replicas<ReplicaRequest> replicasProxy;
private final AtomicBoolean finished = new AtomicBoolean();
@ -80,10 +79,8 @@ public class ReplicationOperation<
public ReplicationOperation(Request request, Primary<Request, ReplicaRequest, PrimaryResultT> primary,
ActionListener<PrimaryResultT> listener,
boolean executeOnReplicas, boolean checkActiveShardCount,
Replicas<ReplicaRequest> replicas,
boolean executeOnReplicas, Replicas<ReplicaRequest> replicas,
Supplier<ClusterState> clusterStateSupplier, ESLogger logger, String opType) {
this.checkActiveShardCount = checkActiveShardCount;
this.executeOnReplicas = executeOnReplicas;
this.replicasProxy = replicas;
this.primary = primary;
@ -95,7 +92,7 @@ public class ReplicationOperation<
}
public void execute() throws Exception {
final String activeShardCountFailure = checkActiveShardCount ? checkActiveShardCount() : null;
final String activeShardCountFailure = checkActiveShardCount();
final ShardRouting primaryRouting = primary.routingEntry();
final ShardId primaryId = primaryRouting.shardId();
if (activeShardCountFailure != null) {

View File

@ -63,7 +63,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
/**
* The number of shard copies that must be active before proceeding with the replication action.
*/
private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
protected ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
private long routedBasedOnClusterVersion = 0;

View File

@ -165,13 +165,6 @@ public abstract class TransportReplicationAction<
*/
protected abstract ReplicaResult shardOperationOnReplica(ReplicaRequest shardRequest);
/**
* True if the active shard count should be checked before proceeding with the replication action.
*/
protected boolean checkActiveShardCount() {
return true;
}
/**
* Cluster level block to check before request execution
*/
@ -354,7 +347,7 @@ public abstract class TransportReplicationAction<
Request request, ActionListener<PrimaryResult> listener,
PrimaryShardReference primaryShardReference, boolean executeOnReplicas) {
return new ReplicationOperation<>(request, primaryShardReference, listener,
executeOnReplicas, checkActiveShardCount(), replicasProxy, clusterService::state, logger, actionName
executeOnReplicas, replicasProxy, clusterService::state, logger, actionName
);
}
}

View File

@ -136,7 +136,7 @@ public class ReplicationOperationTests extends ESTestCase {
Request request = new Request(shardId);
PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
final TestReplicationOperation op = new TestReplicationOperation(request,
new TestPrimary(primaryShard, primaryTerm), listener, false, false,
new TestPrimary(primaryShard, primaryTerm), listener, false,
new TestReplicaProxy(), () -> state, logger, "test");
op.execute();
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
@ -281,7 +281,7 @@ public class ReplicationOperationTests extends ESTestCase {
final ShardRouting primaryShard = shardRoutingTable.primaryShard();
final TestReplicationOperation op = new TestReplicationOperation(request,
new TestPrimary(primaryShard, primaryTerm),
listener, randomBoolean(), true, new TestReplicaProxy(), () -> state, logger, "test");
listener, randomBoolean(), new TestReplicaProxy(), () -> state, logger, "test");
if (passesActiveShardCheck) {
assertThat(op.checkActiveShardCount(), nullValue());
@ -329,6 +329,7 @@ public class ReplicationOperationTests extends ESTestCase {
this();
this.shardId = shardId;
this.index = shardId.getIndexName();
this.waitForActiveShards = ActiveShardCount.NONE;
// keep things simple
}
@ -440,13 +441,13 @@ public class ReplicationOperationTests extends ESTestCase {
class TestReplicationOperation extends ReplicationOperation<Request, Request, TestPrimary.Result> {
public TestReplicationOperation(Request request, Primary<Request, Request, TestPrimary.Result> primary,
ActionListener<TestPrimary.Result> listener, Replicas<Request> replicas, Supplier<ClusterState> clusterStateSupplier) {
this(request, primary, listener, true, false, replicas, clusterStateSupplier, ReplicationOperationTests.this.logger, "test");
this(request, primary, listener, true, replicas, clusterStateSupplier, ReplicationOperationTests.this.logger, "test");
}
public TestReplicationOperation(Request request, Primary<Request, Request, TestPrimary.Result> primary,
ActionListener<TestPrimary.Result> listener, boolean executeOnReplicas, boolean checkActiveShardCount,
ActionListener<TestPrimary.Result> listener, boolean executeOnReplicas,
Replicas<Request> replicas, Supplier<ClusterState> clusterStateSupplier, ESLogger logger, String opType) {
super(request, primary, listener, executeOnReplicas, checkActiveShardCount, replicas, clusterStateSupplier, logger, opType);
super(request, primary, listener, executeOnReplicas, replicas, clusterStateSupplier, logger, opType);
}
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.ClusterState;
@ -719,6 +720,7 @@ public class TransportReplicationActionTests extends ESTestCase {
this();
this.shardId = shardId;
this.index = shardId.getIndexName();
this.waitForActiveShards = ActiveShardCount.NONE;
// keep things simple
}
@ -765,11 +767,6 @@ public class TransportReplicationActionTests extends ESTestCase {
return new ReplicaResult();
}
@Override
protected boolean checkActiveShardCount() {
return false;
}
@Override
protected boolean resolveIndex() {
return false;
@ -815,7 +812,7 @@ public class TransportReplicationActionTests extends ESTestCase {
class NoopReplicationOperation extends ReplicationOperation<Request, Request, Action.PrimaryResult> {
public NoopReplicationOperation(Request request, ActionListener<Action.PrimaryResult> listener) {
super(request, null, listener, true, true, null, null, TransportReplicationActionTests.this.logger, "noop");
super(request, null, listener, true, null, null, TransportReplicationActionTests.this.logger, "noop");
}
@Override

View File

@ -33,6 +33,7 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationResponse;
@ -251,7 +252,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
public int indexDocs(final int numOfDoc) throws Exception {
for (int doc = 0; doc < numOfDoc; doc++) {
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", Integer.toString(docId.incrementAndGet()))
.source("{}");
.source("{}").waitForActiveShards(ActiveShardCount.NONE);
final IndexResponse response = index(indexRequest);
assertEquals(DocWriteResponse.Result.CREATED, response.getResult());
}
@ -398,7 +399,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
private final ReplicationGroup replicationGroup;
public IndexingOp(IndexRequest request, ActionListener<IndexingResult> listener, ReplicationGroup replicationGroup) {
super(request, new PrimaryRef(replicationGroup), listener, true, false, new ReplicasRef(replicationGroup),
super(request, new PrimaryRef(replicationGroup), listener, true, new ReplicasRef(replicationGroup),
() -> null, logger, "indexing");
this.replicationGroup = replicationGroup;
request.process(null, true, request.index());