Change certain replica failures not to fail the replica shard

This changes the way that replica failures are handled such that not all
failures will cause the replica shard to be failed or marked as stale.

In some cases such as refresh operations, or global checkpoint syncs, it is
"okay" for the operation to fail without the shard being failed (because no data
is out of sync). In these cases, instead of failing the shard we should simply
fail the operation, and, in the event it is a user-facing operation, return a
5xx response code including the shard-specific failures.

This was accomplished by having two forms of the `Replicas` proxy, one that is
for non-write operations that does not fail the shard, and one that is for write
operations that will fail the shard when an operation fails.

Relates to #10708
This commit is contained in:
Lee Hinman 2017-01-16 10:46:12 -07:00
parent b0c9759441
commit 39e7c30912
12 changed files with 533 additions and 191 deletions

View File

@ -167,7 +167,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]IndicesOptions.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]ToXContentToBytes.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]BroadcastOperationRequestBuilder.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]BroadcastResponse.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]TransportBroadcastAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]node[/\\]TransportBroadcastByNodeAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]master[/\\]AcknowledgedRequest.java" checks="LineLength" />

View File

@ -23,6 +23,8 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.index.shard.ShardNotFoundException;
import java.io.IOException;
import java.util.List;
@ -42,11 +44,22 @@ public class BroadcastResponse extends ActionResponse {
public BroadcastResponse() {
}
public BroadcastResponse(int totalShards, int successfulShards, int failedShards, List<? extends ShardOperationFailedException> shardFailures) {
public BroadcastResponse(int totalShards, int successfulShards, int failedShards,
List<? extends ShardOperationFailedException> shardFailures) {
assertNoShardNotAvailableFailures(shardFailures);
this.totalShards = totalShards;
this.successfulShards = successfulShards;
this.failedShards = failedShards;
this.shardFailures = shardFailures == null ? EMPTY : shardFailures.toArray(new ShardOperationFailedException[shardFailures.size()]);
this.shardFailures = shardFailures == null ? EMPTY :
shardFailures.toArray(new ShardOperationFailedException[shardFailures.size()]);
}
private void assertNoShardNotAvailableFailures(List<? extends ShardOperationFailedException> shardFailures) {
if (shardFailures != null) {
for (Object e : shardFailures) {
assert (e instanceof ShardNotFoundException) == false : "expected no ShardNotFoundException failures, but got " + e;
}
}
}
/**
@ -70,6 +83,17 @@ public class BroadcastResponse extends ActionResponse {
return failedShards;
}
/**
* The REST status that should be used for the response
*/
public RestStatus getStatus() {
if (failedShards > 0) {
return shardFailures[0].status();
} else {
return RestStatus.OK;
}
}
/**
* The list of shard failures exception.
*/

View File

@ -147,7 +147,7 @@ public class ReplicationOperation<
for (String allocationId : Sets.difference(inSyncAllocationIds, availableAllocationIds)) {
// mark copy as stale
pendingActions.incrementAndGet();
replicasProxy.markShardCopyAsStale(replicaRequest.shardId(), allocationId, replicaRequest.primaryTerm(),
replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId, replicaRequest.primaryTerm(),
ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted,
throwable -> decPendingAndFinishIfNeeded()
@ -209,14 +209,9 @@ public class ReplicationOperation<
shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
logger.warn(
(org.apache.logging.log4j.util.Supplier<?>)
() -> new ParameterizedMessage("[{}] {}", shard.shardId(), message), replicaException);
replicasProxy.failShard(shard, replicaRequest.primaryTerm(), message, replicaException,
ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted,
throwable -> decPendingAndFinishIfNeeded()
);
replicasProxy.failShardIfNeeded(shard, replicaRequest.primaryTerm(), message,
replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
}
}
});
@ -314,10 +309,13 @@ public class ReplicationOperation<
}
}
/**
* An encapsulation of an operation that is to be performed on the primary shard
*/
public interface Primary<
Request extends ReplicationRequest<Request>,
ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
PrimaryResultT extends PrimaryResult<ReplicaRequest>
RequestT extends ReplicationRequest<RequestT>,
ReplicaRequestT extends ReplicationRequest<ReplicaRequestT>,
PrimaryResultT extends PrimaryResult<ReplicaRequestT>
> {
/**
@ -338,7 +336,7 @@ public class ReplicationOperation<
* @param request the request to perform
* @return the request to send to the repicas
*/
PrimaryResultT perform(Request request) throws Exception;
PrimaryResultT perform(RequestT request) throws Exception;
/**
@ -355,7 +353,10 @@ public class ReplicationOperation<
long localCheckpoint();
}
public interface Replicas<ReplicaRequest extends ReplicationRequest<ReplicaRequest>> {
/**
* An encapsulation of an operation that will be executed on the replica shards, if present.
*/
public interface Replicas<RequestT extends ReplicationRequest<RequestT>> {
/**
* performs the the given request on the specified replica
@ -364,24 +365,29 @@ public class ReplicationOperation<
* @param replicaRequest operation to peform
* @param listener a callback to call once the operation has been complicated, either successfully or with an error.
*/
void performOn(ShardRouting replica, ReplicaRequest replicaRequest, ActionListener<ReplicaResponse> listener);
void performOn(ShardRouting replica, RequestT replicaRequest, ActionListener<ReplicaResponse> listener);
/**
* Fail the specified shard, removing it from the current set of active shards
* Fail the specified shard if needed, removing it from the current set
* of active shards. Whether a failure is needed is left up to the
* implementation.
*
* @param replica shard to fail
* @param primaryTerm the primary term of the primary shard when requesting the failure
* @param message a (short) description of the reason
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
* @param onSuccess a callback to call when the shard has been successfully removed from the active set.
* @param onPrimaryDemoted a callback to call when the shard can not be failed because the current primary has been demoted
* by the master.
* by the master.
* @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the
*/
void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
/**
* Marks shard copy as stale, removing its allocation id from the set of in-sync allocation ids.
* Marks shard copy as stale if needed, removing its allocation id from
* the set of in-sync allocation ids. Whether marking as stale is needed
* is left up to the implementation.
*
* @param shardId shard id
* @param allocationId allocation id to remove from the set of in-sync allocation ids
@ -391,8 +397,8 @@ public class ReplicationOperation<
* by the master.
* @param onIgnoredFailure a callback to call when the request failed, but the failure can be safely ignored.
*/
void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
}
/**
@ -422,13 +428,13 @@ public class ReplicationOperation<
}
}
public interface PrimaryResult<R extends ReplicationRequest<R>> {
public interface PrimaryResult<RequestT extends ReplicationRequest<RequestT>> {
/**
* @return null if no operation needs to be sent to a replica
* (for example when the operation failed on the primary due to a parsing exception)
*/
@Nullable R replicaRequest();
@Nullable RequestT replicaRequest();
void setShardInfo(ReplicationResponse.ShardInfo shardInfo);
}

View File

@ -99,15 +99,15 @@ public abstract class TransportReplicationAction<
private final TransportService transportService;
protected final ClusterService clusterService;
protected final ShardStateAction shardStateAction;
private final IndicesService indicesService;
private final ShardStateAction shardStateAction;
private final TransportRequestOptions transportOptions;
private final String executor;
// package private for testing
private final String transportReplicaAction;
private final String transportPrimaryAction;
private final ReplicasProxy replicasProxy;
private final ReplicationOperation.Replicas replicasProxy;
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
@ -135,7 +135,7 @@ public abstract class TransportReplicationAction<
this.transportOptions = transportOptions();
this.replicasProxy = new ReplicasProxy();
this.replicasProxy = newReplicasProxy();
}
@Override
@ -148,6 +148,10 @@ public abstract class TransportReplicationAction<
new ReroutePhase((ReplicationTask) task, request, listener).run();
}
protected ReplicationOperation.Replicas newReplicasProxy() {
return new ReplicasProxy();
}
protected abstract Response newResponseInstance();
/**
@ -369,8 +373,7 @@ public abstract class TransportReplicationAction<
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
PrimaryShardReference primaryShardReference, boolean executeOnReplicas) {
return new ReplicationOperation<>(request, primaryShardReference, listener,
executeOnReplicas, replicasProxy, clusterService::state, logger, actionName
);
executeOnReplicas, replicasProxy, clusterService::state, logger, actionName);
}
}
@ -1030,7 +1033,13 @@ public abstract class TransportReplicationAction<
}
}
final class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> {
/**
* The {@code ReplicasProxy} is an implementation of the {@code Replicas}
* interface that performs the actual {@code ReplicaRequest} on the replica
* shards. It also encapsulates the logic required for failing the replica
* if deemed necessary as well as marking it as stale when needed.
*/
class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> {
@Override
public void performOn(ShardRouting replica, ReplicaRequest request, ActionListener<ReplicationOperation.ReplicaResponse> listener) {
@ -1041,45 +1050,28 @@ public abstract class TransportReplicationAction<
return;
}
final ConcreteShardRequest<ReplicaRequest> concreteShardRequest =
new ConcreteShardRequest<>(request, replica.allocationId().getId());
new ConcreteShardRequest<>(request, replica.allocationId().getId());
sendReplicaRequest(concreteShardRequest, node, listener);
}
@Override
public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception,
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
// This does not need to fail the shard. The idea is that this
// is a non-write operation (something like a refresh or a global
// checkpoint sync) and therefore the replica should still be
// "alive" if it were to fail.
onSuccess.run();
}
@Override
public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, "mark copy as stale", null,
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
private ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer<Exception> onPrimaryDemoted,
final Consumer<Exception> onIgnoredFailure) {
return new ShardStateAction.Listener() {
@Override
public void onSuccess() {
onSuccess.run();
}
@Override
public void onFailure(Exception shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
onPrimaryDemoted.accept(shardFailedError);
} else {
// these can occur if the node is shutting down and are okay
// any other exception here is not expected and merits investigation
assert shardFailedError instanceof TransportException ||
shardFailedError instanceof NodeClosedException : shardFailedError;
onIgnoredFailure.accept(shardFailedError);
}
}
};
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
// This does not need to make the shard stale. The idea is that this
// is a non-write operation (something like a refresh or a global
// checkpoint sync) and therefore the replica should still be
// "alive" if it were to be marked as stale.
onSuccess.run();
}
}

View File

@ -20,29 +20,38 @@
package org.elasticsearch.action.support.replication;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.Translog.Location;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.apache.logging.log4j.core.pattern.ConverterKeys;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
@ -63,6 +72,11 @@ public abstract class TransportWriteAction<
indexNameExpressionResolver, request, replicaRequest, executor);
}
@Override
protected ReplicationOperation.Replicas newReplicasProxy() {
return new WriteActionReplicasProxy();
}
/**
* Called on the primary with a reference to the primary {@linkplain IndexShard} to modify.
*
@ -311,4 +325,55 @@ public abstract class TransportWriteAction<
}
}
}
/**
* A proxy for <b>write</b> operations that need to be performed on the
* replicas, where a failure to execute the operation should fail
* the replica shard and/or mark the replica as stale.
*
* This extends {@code TransportReplicationAction.ReplicasProxy} to do the
* failing and stale-ing.
*/
class WriteActionReplicasProxy extends ReplicasProxy {
@Override
public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
logger.warn((org.apache.logging.log4j.util.Supplier<?>)
() -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception,
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, "mark copy as stale", null,
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
public ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer<Exception> onPrimaryDemoted,
final Consumer<Exception> onIgnoredFailure) {
return new ShardStateAction.Listener() {
@Override
public void onSuccess() {
onSuccess.run();
}
@Override
public void onFailure(Exception shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
onPrimaryDemoted.accept(shardFailedError);
} else {
// these can occur if the node is shutting down and are okay
// any other exception here is not expected and merits investigation
assert shardFailedError instanceof TransportException ||
shardFailedError instanceof NodeClosedException : shardFailedError;
onIgnoredFailure.accept(shardFailedError);
}
}
};
}
}
}

View File

@ -60,7 +60,7 @@ public class RestRefreshAction extends BaseRestHandler {
builder.startObject();
buildBroadcastShardsHeader(builder, request, response);
builder.endObject();
return new BytesRestResponse(OK, builder);
return new BytesRestResponse(response.getStatus(), builder);
}
});
}

View File

@ -190,11 +190,11 @@ public class ReplicationOperationTests extends ESTestCase {
final boolean testPrimaryDemotedOnStaleShardCopies = randomBoolean();
final TestReplicaProxy replicasProxy = new TestReplicaProxy(expectedFailures) {
@Override
public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted,
Consumer<Exception> onIgnoredFailure) {
public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted,
Consumer<Exception> onIgnoredFailure) {
if (testPrimaryDemotedOnStaleShardCopies) {
super.failShard(replica, primaryTerm, message, exception, onSuccess, onPrimaryDemoted, onIgnoredFailure);
super.failShardIfNeeded(replica, primaryTerm, message, exception, onSuccess, onPrimaryDemoted, onIgnoredFailure);
} else {
assertThat(replica, equalTo(failedReplica));
onPrimaryDemoted.accept(new ElasticsearchException("the king is dead"));
@ -202,12 +202,12 @@ public class ReplicationOperationTests extends ESTestCase {
}
@Override
public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
if (testPrimaryDemotedOnStaleShardCopies) {
onPrimaryDemoted.accept(new ElasticsearchException("the king is dead"));
} else {
super.markShardCopyAsStale(shardId, allocationId, primaryTerm, onSuccess, onPrimaryDemoted, onIgnoredFailure);
super.markShardCopyAsStaleIfNeeded(shardId, allocationId, primaryTerm, onSuccess, onPrimaryDemoted, onIgnoredFailure);
}
}
};
@ -486,8 +486,8 @@ public class ReplicationOperationTests extends ESTestCase {
}
@Override
public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
if (failedReplicas.add(replica) == false) {
fail("replica [" + replica + "] was failed twice");
}
@ -503,8 +503,8 @@ public class ReplicationOperationTests extends ESTestCase {
}
@Override
public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
if (markedAsStaleCopies.add(allocationId) == false) {
fail("replica [" + allocationId + "] was marked as stale twice");
}

View File

@ -135,7 +135,7 @@ public class TransportReplicationActionTests extends ESTestCase {
private ClusterService clusterService;
private TransportService transportService;
private CapturingTransport transport;
private Action action;
private TestAction action;
private ShardStateAction shardStateAction;
/* *
@ -159,7 +159,7 @@ public class TransportReplicationActionTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool);
action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, shardStateAction, threadPool);
action = new TestAction(Settings.EMPTY, "testAction", transportService, clusterService, shardStateAction, threadPool);
}
@After
@ -185,9 +185,10 @@ public class TransportReplicationActionTests extends ESTestCase {
public void testBlocks() throws ExecutionException, InterruptedException {
Request request = new Request();
PlainActionFuture<Response> listener = new PlainActionFuture<>();
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
Action action = new Action(Settings.EMPTY, "testActionWithBlocks", transportService, clusterService, shardStateAction, threadPool) {
TestAction action = new TestAction(Settings.EMPTY, "testActionWithBlocks",
transportService, clusterService, shardStateAction, threadPool) {
@Override
protected ClusterBlockLevel globalBlockLevel() {
return ClusterBlockLevel.WRITE;
@ -197,7 +198,7 @@ public class TransportReplicationActionTests extends ESTestCase {
ClusterBlocks.Builder block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run();
assertListenerThrows("primary phase should fail operation", listener, ClusterBlockException.class);
assertPhase(task, "failed");
@ -226,7 +227,7 @@ public class TransportReplicationActionTests extends ESTestCase {
ClusterBlockException.class);
assertIndexShardUninitialized();
action = new Action(Settings.EMPTY, "testActionWithNoBlocks", transportService, clusterService, shardStateAction, threadPool) {
action = new TestAction(Settings.EMPTY, "testActionWithNoBlocks", transportService, clusterService, shardStateAction, threadPool) {
@Override
protected ClusterBlockLevel globalBlockLevel() {
return null;
@ -253,8 +254,8 @@ public class TransportReplicationActionTests extends ESTestCase {
logger.debug("--> using initial state:\n{}", clusterService.state());
Request request = new Request(shardId).timeout("1ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();
Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run();
assertListenerThrows("unassigned primary didn't cause a timeout", listener, UnavailableShardsException.class);
assertPhase(task, "failed");
@ -301,8 +302,8 @@ public class TransportReplicationActionTests extends ESTestCase {
logger.debug("--> relocation ongoing state:\n{}", clusterService.state());
Request request = new Request(shardId).timeout("1ms").routedBasedOnClusterVersion(clusterService.state().version() + 1);
PlainActionFuture<Response> listener = new PlainActionFuture<>();
Action.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener);
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener);
reroutePhase.run();
assertListenerThrows("cluster state too old didn't cause a timeout", listener, UnavailableShardsException.class);
assertTrue(request.isRetrySet.compareAndSet(true, false));
@ -340,10 +341,10 @@ public class TransportReplicationActionTests extends ESTestCase {
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED));
logger.debug("--> using initial state:\n{}", clusterService.state());
Request request = new Request(new ShardId("unknown_index", "_na_", 0)).timeout("1ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run();
assertListenerThrows("must throw index not found exception", listener, IndexNotFoundException.class);
assertPhase(task, "failed");
@ -364,17 +365,18 @@ public class TransportReplicationActionTests extends ESTestCase {
ShardRoutingState.UNASSIGNED), new CloseIndexRequest(index)));
logger.debug("--> using initial state:\n{}", clusterService.state());
Request request = new Request(new ShardId("test", "_na_", 0)).timeout("1ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
ClusterBlockLevel indexBlockLevel = randomBoolean() ? ClusterBlockLevel.WRITE : null;
Action action = new Action(Settings.EMPTY, "testActionWithBlocks", transportService, clusterService, shardStateAction, threadPool) {
TestAction action = new TestAction(Settings.EMPTY, "testActionWithBlocks", transportService,
clusterService, shardStateAction, threadPool) {
@Override
protected ClusterBlockLevel indexBlockLevel() {
return indexBlockLevel;
}
};
Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run();
if (indexBlockLevel == ClusterBlockLevel.WRITE) {
assertListenerThrows("must throw block exception", listener, ClusterBlockException.class);
@ -398,10 +400,10 @@ public class TransportReplicationActionTests extends ESTestCase {
} else {
request.timeout("1h");
}
PlainActionFuture<Response> listener = new PlainActionFuture<>();
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run();
CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests, arrayWithSize(1));
@ -452,9 +454,9 @@ public class TransportReplicationActionTests extends ESTestCase {
final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId();
Request request = new Request(shardId);
PlainActionFuture<Response> listener = new PlainActionFuture<>();
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run();
assertThat(request.shardId(), equalTo(shardId));
logger.info("--> primary is assigned to [{}], checking request forwarded", primaryNodeId);
@ -479,7 +481,7 @@ public class TransportReplicationActionTests extends ESTestCase {
ClusterState state = stateWithActivePrimary(index, true, randomInt(5));
setState(clusterService, state);
Request request = new Request(shardId).timeout("1ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
AtomicBoolean executed = new AtomicBoolean();
@ -492,11 +494,12 @@ public class TransportReplicationActionTests extends ESTestCase {
}
action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) {
@Override
protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, Response>>
createReplicatedOperation(Request request,
ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener,
TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference,
boolean executeOnReplicas) {
protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, TestResponse>>
createReplicatedOperation(
Request request,
ActionListener<TransportReplicationAction.PrimaryResult<Request, TestResponse>> actionListener,
TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference,
boolean executeOnReplicas) {
return new NoopReplicationOperation(request, actionListener) {
public void execute() throws Exception {
assertPhase(task, "primary");
@ -521,7 +524,7 @@ public class TransportReplicationActionTests extends ESTestCase {
assertThat(requests.size(), equalTo(1));
assertThat("primary request was not delegated to relocation target", requests.get(0).action, equalTo("testAction[p]"));
assertPhase(task, "primary_delegation");
transport.handleResponse(requests.get(0).requestId, new Response());
transport.handleResponse(requests.get(0).requestId, new TestResponse());
assertTrue(listener.isDone());
listener.get();
assertPhase(task, "finished");
@ -539,16 +542,17 @@ public class TransportReplicationActionTests extends ESTestCase {
state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryTargetNodeId)).build();
setState(clusterService, state);
Request request = new Request(shardId).timeout("1ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
AtomicBoolean executed = new AtomicBoolean();
action.new AsyncPrimaryAction(request, primaryShard.allocationId().getRelocationId(), createTransportChannel(listener), task) {
@Override
protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, Response>>
createReplicatedOperation(Request request,
ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener,
TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference,
boolean executeOnReplicas) {
protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, TestResponse>>
createReplicatedOperation(
Request request,
ActionListener<TransportReplicationAction.PrimaryResult<Request, TestResponse>> actionListener,
TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference,
boolean executeOnReplicas) {
return new NoopReplicationOperation(request, actionListener) {
public void execute() throws Exception {
assertPhase(task, "primary");
@ -579,7 +583,7 @@ public class TransportReplicationActionTests extends ESTestCase {
fail("releasable is closed twice");
}
};
Action.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable);
TestAction.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable);
final Request request = new Request();
Request replicaRequest = (Request) primary.perform(request).replicaRequest;
@ -596,7 +600,7 @@ public class TransportReplicationActionTests extends ESTestCase {
}
public void testReplicaProxy() throws InterruptedException, ExecutionException {
Action.ReplicasProxy proxy = action.new ReplicasProxy();
ReplicationOperation.Replicas proxy = action.newReplicasProxy();
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
ClusterState state = stateWithActivePrimary(index, true, 1 + randomInt(3), randomInt(2));
@ -636,43 +640,15 @@ public class TransportReplicationActionTests extends ESTestCase {
assertListenerThrows("listener should reflect remote error", listener, TransportException.class);
}
AtomicReference<Throwable> failure = new AtomicReference<>();
AtomicReference<Throwable> ignoredFailure = new AtomicReference<>();
AtomicReference<Object> failure = new AtomicReference<>();
AtomicReference<Object> ignoredFailure = new AtomicReference<>();
AtomicBoolean success = new AtomicBoolean();
proxy.failShard(replica, randomIntBetween(1, 10), "test", new ElasticsearchException("simulated"),
() -> success.set(true), failure::set, ignoredFailure::set
proxy.failShardIfNeeded(replica, randomIntBetween(1, 10), "test", new ElasticsearchException("simulated"),
() -> success.set(true), failure::set, ignoredFailure::set
);
CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear();
assertEquals(1, shardFailedRequests.length);
CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0];
ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) shardFailedRequest.request;
// the shard the request was sent to and the shard to be failed should be the same
assertEquals(shardEntry.getShardId(), replica.shardId());
assertEquals(shardEntry.getAllocationId(), replica.allocationId().getId());
if (randomBoolean()) {
// simulate success
transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE);
assertTrue(success.get());
assertNull(failure.get());
assertNull(ignoredFailure.get());
} else if (randomBoolean()) {
// simulate the primary has been demoted
transport.handleRemoteError(shardFailedRequest.requestId,
new ShardStateAction.NoLongerPrimaryShardException(replica.shardId(),
"shard-failed-test"));
assertFalse(success.get());
assertNotNull(failure.get());
assertNull(ignoredFailure.get());
} else {
// simulated an "ignored" exception
transport.handleRemoteError(shardFailedRequest.requestId,
new NodeClosedException(state.nodes().getLocalNode()));
assertFalse(success.get());
assertNull(failure.get());
assertNotNull(ignoredFailure.get());
}
// A replication action doesn't not fail the request
assertEquals(0, shardFailedRequests.length);
}
public void testShadowIndexDisablesReplication() throws Exception {
@ -691,9 +667,9 @@ public class TransportReplicationActionTests extends ESTestCase {
action.new AsyncPrimaryAction(new Request(shardId), primaryShard.allocationId().getId(),
createTransportChannel(new PlainActionFuture<>()), null) {
@Override
protected ReplicationOperation<Request, Request, Action.PrimaryResult<Request, Response>> createReplicatedOperation(
Request request, ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener,
TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference,
protected ReplicationOperation<Request, Request, TestAction.PrimaryResult<Request, TestResponse>> createReplicatedOperation(
Request request, ActionListener<TransportReplicationAction.PrimaryResult<Request, TestResponse>> actionListener,
TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference,
boolean executeOnReplicas) {
assertFalse(executeOnReplicas);
assertFalse(executed.getAndSet(true));
@ -715,7 +691,7 @@ public class TransportReplicationActionTests extends ESTestCase {
Request request = new Request(shardId);
TransportReplicationAction.ConcreteShardRequest<Request> concreteShardRequest =
new TransportReplicationAction.ConcreteShardRequest<>(request, routingEntry.allocationId().getId());
PlainActionFuture<Response> listener = new PlainActionFuture<>();
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
final IndexShard shard = mock(IndexShard.class);
@ -730,10 +706,10 @@ public class TransportReplicationActionTests extends ESTestCase {
}
};
Action action =
new Action(Settings.EMPTY, "testSeqNoIsSetOnPrimary", transportService, clusterService, shardStateAction, threadPool);
TestAction action =
new TestAction(Settings.EMPTY, "testSeqNoIsSetOnPrimary", transportService, clusterService, shardStateAction, threadPool);
TransportReplicationAction<Request, Request, Response>.PrimaryOperationTransportHandler primaryPhase =
TransportReplicationAction<Request, Request, TestResponse>.PrimaryOperationTransportHandler primaryPhase =
action.new PrimaryOperationTransportHandler();
primaryPhase.messageReceived(concreteShardRequest, createTransportChannel(listener), null);
CapturingTransport.CapturedRequest[] requestsToReplicas = transport.capturedRequests();
@ -751,7 +727,7 @@ public class TransportReplicationActionTests extends ESTestCase {
logger.debug("--> using initial state:\n{}", clusterService.state());
final ShardRouting primaryShard = state.routingTable().shardRoutingTable(shardId).primaryShard();
Request request = new Request(shardId);
PlainActionFuture<Response> listener = new PlainActionFuture<>();
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
int i = randomInt(3);
final boolean throwExceptionOnCreation = i == 1;
@ -759,11 +735,12 @@ public class TransportReplicationActionTests extends ESTestCase {
final boolean respondWithError = i == 3;
action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) {
@Override
protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, Response>>
createReplicatedOperation(Request request,
ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener,
TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference,
boolean executeOnReplicas) {
protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, TestResponse>>
createReplicatedOperation(
Request request,
ActionListener<TransportReplicationAction.PrimaryResult<Request, TestResponse>> actionListener,
TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference,
boolean executeOnReplicas) {
assertIndexShardCounter(1);
if (throwExceptionOnCreation) {
throw new ElasticsearchException("simulated exception, during createReplicatedOperation");
@ -808,7 +785,7 @@ public class TransportReplicationActionTests extends ESTestCase {
final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0);
boolean throwException = randomBoolean();
final ReplicationTask task = maybeTask();
Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction,
TestAction action = new TestAction(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction,
threadPool) {
@Override
protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) {
@ -820,7 +797,7 @@ public class TransportReplicationActionTests extends ESTestCase {
return new ReplicaResult();
}
};
final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
try {
replicaOperationTransportHandler.messageReceived(
new TransportReplicationAction.ConcreteShardRequest<>(
@ -871,7 +848,7 @@ public class TransportReplicationActionTests extends ESTestCase {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
setState(clusterService, state(index, true, ShardRoutingState.STARTED));
PlainActionFuture<Response> listener = new PlainActionFuture<>();
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
Request request = new Request(shardId).timeout("1ms");
action.new PrimaryOperationTransportHandler().messageReceived(
new TransportReplicationAction.ConcreteShardRequest<>(request, "_not_a_valid_aid_"),
@ -897,7 +874,7 @@ public class TransportReplicationActionTests extends ESTestCase {
state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(replica.currentNodeId())).build();
setState(clusterService, state);
PlainActionFuture<Response> listener = new PlainActionFuture<>();
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
Request request = new Request(shardId).timeout("1ms");
action.new ReplicaOperationTransportHandler().messageReceived(
new TransportReplicationAction.ConcreteShardRequest<>(request, "_not_a_valid_aid_"),
@ -928,7 +905,7 @@ public class TransportReplicationActionTests extends ESTestCase {
setState(clusterService, state);
AtomicBoolean throwException = new AtomicBoolean(true);
final ReplicationTask task = maybeTask();
Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction,
TestAction action = new TestAction(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction,
threadPool) {
@Override
protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) {
@ -939,8 +916,8 @@ public class TransportReplicationActionTests extends ESTestCase {
return new ReplicaResult();
}
};
final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
final PlainActionFuture<Response> listener = new PlainActionFuture<>();
final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
final PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
final Request request = new Request().setShardId(shardId);
request.primaryTerm(state.metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id()));
replicaOperationTransportHandler.messageReceived(
@ -1047,31 +1024,46 @@ public class TransportReplicationActionTests extends ESTestCase {
}
}
static class Response extends ReplicationResponse {
static class TestResponse extends ReplicationResponse {
}
class Action extends TransportReplicationAction<Request, Request, Response> {
private class TestAction extends TransportReplicationAction<Request, Request, TestResponse> {
Action(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService,
ShardStateAction shardStateAction,
ThreadPool threadPool) {
private final boolean withDocumentFailureOnPrimary;
private final boolean withDocumentFailureOnReplica;
TestAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, ShardStateAction shardStateAction,
ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool,
shardStateAction,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY),
Request::new, Request::new, ThreadPool.Names.SAME);
this.withDocumentFailureOnPrimary = false;
this.withDocumentFailureOnReplica = false;
}
TestAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, ShardStateAction shardStateAction,
ThreadPool threadPool, boolean withDocumentFailureOnPrimary, boolean withDocumentFailureOnReplica) {
super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool,
shardStateAction,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY),
Request::new, Request::new, ThreadPool.Names.SAME);
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
}
@Override
protected Response newResponseInstance() {
return new Response();
protected TestResponse newResponseInstance() {
return new TestResponse();
}
@Override
protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) throws Exception {
boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true);
assert executedBefore == false : "request has already been executed on the primary";
return new PrimaryResult(shardRequest, new Response());
return new PrimaryResult(shardRequest, new TestResponse());
}
@Override
@ -1156,22 +1148,23 @@ public class TransportReplicationActionTests extends ESTestCase {
return indexShard;
}
class NoopReplicationOperation extends ReplicationOperation<Request, Request, Action.PrimaryResult<Request, Response>> {
NoopReplicationOperation(Request request, ActionListener<Action.PrimaryResult<Request, Response>> listener) {
class NoopReplicationOperation extends ReplicationOperation<Request, Request, TestAction.PrimaryResult<Request, TestResponse>> {
NoopReplicationOperation(Request request, ActionListener<TestAction.PrimaryResult<Request, TestResponse>> listener) {
super(request, null, listener, true, null, null, TransportReplicationActionTests.this.logger, "noop");
}
@Override
public void execute() throws Exception {
// Using the diamond operator (<>) prevents Eclipse from being able to compile this code
this.resultListener.onResponse(new TransportReplicationAction.PrimaryResult<Request, Response>(null, new Response()));
this.resultListener.onResponse(new TransportReplicationAction.PrimaryResult<Request, TestResponse>(null, new TestResponse()));
}
}
/**
* Transport channel that is needed for replica operation testing.
*/
public TransportChannel createTransportChannel(final PlainActionFuture<Response> listener) {
public TransportChannel createTransportChannel(final PlainActionFuture<TestResponse> listener) {
return new TransportChannel() {
@Override
@ -1186,12 +1179,12 @@ public class TransportReplicationActionTests extends ESTestCase {
@Override
public void sendResponse(TransportResponse response) throws IOException {
listener.onResponse(((Response) response));
listener.onResponse(((TestResponse) response));
}
@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
listener.onResponse(((Response) response));
listener.onResponse(((TestResponse) response));
}
@Override

View File

@ -19,38 +19,117 @@
package org.elasticsearch.action.support.replication;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationOperation.ReplicaResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.mockito.ArgumentCaptor;
import java.util.HashSet;
import java.util.Locale;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TransportWriteActionTests extends ESTestCase {
private static ThreadPool threadPool;
private ClusterService clusterService;
private IndexShard indexShard;
private Translog.Location location;
@BeforeClass
public static void beforeClass() {
threadPool = new TestThreadPool("ShardReplicationTests");
}
@Before
public void initCommonMocks() {
indexShard = mock(IndexShard.class);
location = mock(Translog.Location.class);
clusterService = createClusterService(threadPool);
}
@After
public void tearDown() throws Exception {
super.tearDown();
clusterService.close();
}
@AfterClass
public static void afterClass() {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
}
<T> void assertListenerThrows(String msg, PlainActionFuture<T> listener, Class<?> klass) throws InterruptedException {
try {
listener.get();
fail(msg);
} catch (ExecutionException ex) {
assertThat(ex.getCause(), instanceOf(klass));
}
}
public void testPrimaryNoRefreshCall() throws Exception {
@ -176,6 +255,95 @@ public class TransportWriteActionTests extends ESTestCase {
assertNotNull(listener.failure);
}
public void testReplicaProxy() throws InterruptedException, ExecutionException {
CapturingTransport transport = new CapturingTransport();
TransportService transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
transportService.start();
transportService.acceptIncomingRequests();
ShardStateAction shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool);
TestAction action = action = new TestAction(Settings.EMPTY, "testAction", transportService,
clusterService, shardStateAction, threadPool);
ReplicationOperation.Replicas proxy = action.newReplicasProxy();
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary(index, true, 1 + randomInt(3), randomInt(2));
logger.info("using state: {}", state);
ClusterServiceUtils.setState(clusterService, state);
// check that at unknown node fails
PlainActionFuture<ReplicaResponse> listener = new PlainActionFuture<>();
proxy.performOn(
TestShardRouting.newShardRouting(shardId, "NOT THERE", false, randomFrom(ShardRoutingState.values())),
new TestRequest(), listener);
assertTrue(listener.isDone());
assertListenerThrows("non existent node should throw a NoNodeAvailableException", listener, NoNodeAvailableException.class);
final IndexShardRoutingTable shardRoutings = state.routingTable().shardRoutingTable(shardId);
final ShardRouting replica = randomFrom(shardRoutings.replicaShards().stream()
.filter(ShardRouting::assignedToNode).collect(Collectors.toList()));
listener = new PlainActionFuture<>();
proxy.performOn(replica, new TestRequest(), listener);
assertFalse(listener.isDone());
CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear();
assertThat(captures, arrayWithSize(1));
if (randomBoolean()) {
final TransportReplicationAction.ReplicaResponse response =
new TransportReplicationAction.ReplicaResponse(randomAsciiOfLength(10), randomLong());
transport.handleResponse(captures[0].requestId, response);
assertTrue(listener.isDone());
assertThat(listener.get(), equalTo(response));
} else if (randomBoolean()) {
transport.handleRemoteError(captures[0].requestId, new ElasticsearchException("simulated"));
assertTrue(listener.isDone());
assertListenerThrows("listener should reflect remote error", listener, ElasticsearchException.class);
} else {
transport.handleError(captures[0].requestId, new TransportException("simulated"));
assertTrue(listener.isDone());
assertListenerThrows("listener should reflect remote error", listener, TransportException.class);
}
AtomicReference<Object> failure = new AtomicReference<>();
AtomicReference<Object> ignoredFailure = new AtomicReference<>();
AtomicBoolean success = new AtomicBoolean();
proxy.failShardIfNeeded(replica, randomIntBetween(1, 10), "test", new ElasticsearchException("simulated"),
() -> success.set(true), failure::set, ignoredFailure::set
);
CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear();
// A write replication action proxy should fail the shard
assertEquals(1, shardFailedRequests.length);
CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0];
ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) shardFailedRequest.request;
// the shard the request was sent to and the shard to be failed should be the same
assertEquals(shardEntry.getShardId(), replica.shardId());
assertEquals(shardEntry.getAllocationId(), replica.allocationId().getId());
if (randomBoolean()) {
// simulate success
transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE);
assertTrue(success.get());
assertNull(failure.get());
assertNull(ignoredFailure.get());
} else if (randomBoolean()) {
// simulate the primary has been demoted
transport.handleRemoteError(shardFailedRequest.requestId,
new ShardStateAction.NoLongerPrimaryShardException(replica.shardId(),
"shard-failed-test"));
assertFalse(success.get());
assertNotNull(failure.get());
assertNull(ignoredFailure.get());
} else {
// simulated an "ignored" exception
transport.handleRemoteError(shardFailedRequest.requestId,
new NodeClosedException(state.nodes().getLocalNode()));
assertFalse(success.get());
assertNull(failure.get());
assertNotNull(ignoredFailure.get());
}
}
private class TestAction extends TransportWriteAction<TestRequest, TestRequest, TestResponse> {
private final boolean withDocumentFailureOnPrimary;
@ -184,6 +352,7 @@ public class TransportWriteActionTests extends ESTestCase {
protected TestAction() {
this(false, false);
}
protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentFailureOnReplica) {
super(Settings.EMPTY, "test",
new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null), null,
@ -193,6 +362,17 @@ public class TransportWriteActionTests extends ESTestCase {
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
}
protected TestAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService,
mockIndicesService(clusterService), threadPool, shardStateAction,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY),
TestRequest::new, TestRequest::new, ThreadPool.Names.SAME);
this.withDocumentFailureOnPrimary = false;
this.withDocumentFailureOnReplica = false;
}
@Override
protected TestResponse newResponseInstance() {
return new TestResponse();
@ -222,6 +402,80 @@ public class TransportWriteActionTests extends ESTestCase {
}
}
final IndexService mockIndexService(final IndexMetaData indexMetaData, ClusterService clusterService) {
final IndexService indexService = mock(IndexService.class);
when(indexService.getShard(anyInt())).then(invocation -> {
int shard = (Integer) invocation.getArguments()[0];
final ShardId shardId = new ShardId(indexMetaData.getIndex(), shard);
if (shard > indexMetaData.getNumberOfShards()) {
throw new ShardNotFoundException(shardId);
}
return mockIndexShard(shardId, clusterService);
});
return indexService;
}
final IndicesService mockIndicesService(ClusterService clusterService) {
final IndicesService indicesService = mock(IndicesService.class);
when(indicesService.indexServiceSafe(any(Index.class))).then(invocation -> {
Index index = (Index)invocation.getArguments()[0];
final ClusterState state = clusterService.state();
final IndexMetaData indexSafe = state.metaData().getIndexSafe(index);
return mockIndexService(indexSafe, clusterService);
});
when(indicesService.indexService(any(Index.class))).then(invocation -> {
Index index = (Index) invocation.getArguments()[0];
final ClusterState state = clusterService.state();
if (state.metaData().hasIndex(index.getName())) {
final IndexMetaData indexSafe = state.metaData().getIndexSafe(index);
return mockIndexService(clusterService.state().metaData().getIndexSafe(index), clusterService);
} else {
return null;
}
});
return indicesService;
}
private final AtomicInteger count = new AtomicInteger(0);
private final AtomicBoolean isRelocated = new AtomicBoolean(false);
private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) {
final IndexShard indexShard = mock(IndexShard.class);
doAnswer(invocation -> {
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[0];
count.incrementAndGet();
callback.onResponse(count::decrementAndGet);
return null;
}).when(indexShard).acquirePrimaryOperationLock(any(ActionListener.class), anyString());
doAnswer(invocation -> {
long term = (Long)invocation.getArguments()[0];
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[1];
final long primaryTerm = indexShard.getPrimaryTerm();
if (term < primaryTerm) {
throw new IllegalArgumentException(String.format(Locale.ROOT, "%s operation term [%d] is too old (current [%d])",
shardId, term, primaryTerm));
}
count.incrementAndGet();
callback.onResponse(count::decrementAndGet);
return null;
}).when(indexShard).acquireReplicaOperationLock(anyLong(), any(ActionListener.class), anyString());
when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
final ClusterState state = clusterService.state();
final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
final ShardRouting routing = node.getByShardId(shardId);
if (routing == null) {
throw new ShardNotFoundException(shardId, "shard is no longer assigned to current node");
}
return routing;
});
when(indexShard.state()).thenAnswer(invocationOnMock -> isRelocated.get() ? IndexShardState.RELOCATED : IndexShardState.STARTED);
doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class));
when(indexShard.getPrimaryTerm()).thenAnswer(i ->
clusterService.state().metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id()));
return indexShard;
}
private static class TestRequest extends ReplicatedWriteRequest<TestRequest> {
TestRequest() {
setShardId(new ShardId("test", "test", 1));

View File

@ -339,9 +339,11 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
client().prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet();
// TODO: remove once refresh doesn't fail immediately if there a master block:
// https://github.com/elastic/elasticsearch/issues/9997
client().admin().cluster().prepareHealth("test").setWaitForYellowStatus().get();
// client().admin().cluster().prepareHealth("test").setWaitForYellowStatus().get();
logger.info("--> refreshing all indices after indexing is complete");
client().admin().indices().prepareRefresh().execute().actionGet();
logger.info("--> checking if documents exist, there should be 3");
for (int i = 0; i < 10; i++) {
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 3);
}

View File

@ -429,14 +429,15 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
@Override
public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted,
Consumer<Exception> onIgnoredFailure) {
throw new UnsupportedOperationException();
}
@Override
public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
throw new UnsupportedOperationException();
}
}
@ -519,9 +520,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
TransportWriteActionTestHelper.performPostWriteActions(replica, request, result.getTranslogLocation(), logger);
}
class GlobalCheckpointSync extends
ReplicationAction<GlobalCheckpointSyncAction.PrimaryRequest, GlobalCheckpointSyncAction.ReplicaRequest, ReplicationResponse> {
class GlobalCheckpointSync extends ReplicationAction<GlobalCheckpointSyncAction.PrimaryRequest,
GlobalCheckpointSyncAction.ReplicaRequest, ReplicationResponse> {
GlobalCheckpointSync(ActionListener<ReplicationResponse> listener, ReplicationGroup replicationGroup) {
super(new GlobalCheckpointSyncAction.PrimaryRequest(replicationGroup.getPrimary().shardId()), listener,
@ -529,7 +529,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
@Override
protected PrimaryResult performOnPrimary(IndexShard primary, GlobalCheckpointSyncAction.PrimaryRequest request) throws Exception {
protected PrimaryResult performOnPrimary(IndexShard primary,
GlobalCheckpointSyncAction.PrimaryRequest request) throws Exception {
return new PrimaryResult(new GlobalCheckpointSyncAction.ReplicaRequest(request, primary.getGlobalCheckpoint()),
new ReplicationResponse());
}

View File

@ -34,7 +34,13 @@ The `ignore_unavailable` and `allow_no_indices` options are no longer accepted
as they could cause undesired results when their values differed from their
defaults.
=== `timestamp` and `ttl` in index requests
==== `timestamp` and `ttl` in index requests
`timestamp` and `ttl` are not accepted anymore as parameters of index/update
requests.
requests.
==== Refresh requests with one or more shard failures return HTTP 500 response instead of 200
Refresh requests that are broadcast to multiple shards that can have one or more
shards fail during the request now return a 500 response instead of a 200
response in the event there is at least one failure.