Use primary terms as authority to fail shards (#19715)

A primary shard currently instructs the master to fail a replica shard that it fails to replicate writes to before acknowledging the writes to the client. To ensure that the primary instructing the master to fail the replica is still the current primary in the cluster state on the master, it submits not only the identity of the replica shard to fail to the master but also its own shard identity. This can be problematic however when the primary is relocating. After primary relocation handoff but before the primary relocation target is activated, the primary relocation target is replicating writes through the authority of the primary relocation source. This means that the primary relocation target should probably send the identity of the primary relocation source as authority. However, this is not good enough either, as primary shard activation and shard failure instructions can arrive out-of-order. This means that the relocation target would have to send both relocation source and target identity as authority. Fortunately, there is another concept in the cluster state that represents this joint authority, namely primary terms. The primary term is only increased on initial assignment or when a replica is promoted. It stays the same however when a primary relocates.

This commit changes ShardStateAction to rely on primary terms for shard authority. It also changes the wire format to only transmit ShardId and allocation id of the shard to fail (instead of the full ShardRouting), so that the same action can be used in a subsequent PR to remove allocation ids from the active allocation set for which there exist no ShardRouting in the cluster anymore. Last but not least, this commit also makes AllocationService less lenient, requiring ShardRouting instances that are passed to its applyStartedShards and applyFailedShards methods to exist in the routing table. ShardStateAction, which is calling these methods, now has the responsibility to resolve the ShardRouting objects that are to be started / failed, and remove duplicates.
This commit is contained in:
Yannick Welsch 2016-08-04 12:00:37 +02:00 committed by GitHub
parent d327dd46b1
commit ede78ad231
27 changed files with 476 additions and 478 deletions

View File

@ -136,7 +136,7 @@ public class ReplicationOperation<
}
if (shard.relocating() && shard.relocatingNodeId().equals(localNodeId) == false) {
performOnReplica(shard.buildTargetRelocatingShard(), replicaRequest);
performOnReplica(shard.getTargetRelocatingShard(), replicaRequest);
}
}
}
@ -167,7 +167,7 @@ public class ReplicationOperation<
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
logger.warn("[{}] {}", replicaException, shard.shardId(), message);
replicasProxy.failShard(shard, primary.routingEntry(), message, replicaException,
replicasProxy.failShard(shard, replicaRequest.primaryTerm(), message, replicaException,
ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted,
throwable -> decPendingAndFinishIfNeeded()
@ -327,7 +327,7 @@ public class ReplicationOperation<
/**
* Fail the specified shard, removing it from the current set of active shards
* @param replica shard to fail
* @param primary the primary shard that requested the failure
* @param primaryTerm the primary term of the primary shard when requesting the failure
* @param message a (short) description of the reason
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
* @param onSuccess a callback to call when the shard has been successfully removed from the active set.
@ -335,7 +335,7 @@ public class ReplicationOperation<
* by the master.
* @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the
*/
void failShard(ShardRouting replica, ShardRouting primary, String message, Exception exception, Runnable onSuccess,
void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
}

View File

@ -866,10 +866,10 @@ public abstract class TransportReplicationAction<
}
@Override
public void failShard(ShardRouting replica, ShardRouting primary, String message, Exception exception,
public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onFailure, Consumer<Exception> onIgnoredFailure) {
shardStateAction.shardFailed(
replica, primary, message, exception,
shardStateAction.remoteShardFailed(
replica, primaryTerm, message, exception,
new ShardStateAction.Listener() {
@Override
public void onSuccess() {

View File

@ -29,9 +29,8 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
@ -64,11 +63,10 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.Set;
public class ShardStateAction extends AbstractComponent {
@ -87,19 +85,19 @@ public class ShardStateAction extends AbstractComponent {
this.clusterService = clusterService;
this.threadPool = threadPool;
transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger));
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger));
transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger));
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger));
}
private void sendShardAction(final String actionName, final ClusterStateObserver observer, final ShardRoutingEntry shardRoutingEntry, final Listener listener) {
private void sendShardAction(final String actionName, final ClusterStateObserver observer, final ShardEntry shardEntry, final Listener listener) {
DiscoveryNode masterNode = observer.observedState().nodes().getMasterNode();
if (masterNode == null) {
logger.warn("{} no master known for action [{}] for shard [{}]", shardRoutingEntry.getShardRouting().shardId(), actionName, shardRoutingEntry.getShardRouting());
waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener);
logger.warn("{} no master known for action [{}] for shard entry [{}]", shardEntry.shardId, actionName, shardEntry);
waitForNewMasterAndRetry(actionName, observer, shardEntry, listener);
} else {
logger.debug("{} sending [{}] to [{}] for shard [{}]", shardRoutingEntry.getShardRouting().shardId(), actionName, masterNode.getId(), shardRoutingEntry);
logger.debug("{} sending [{}] to [{}] for shard entry [{}]", shardEntry.shardId, actionName, masterNode.getId(), shardEntry);
transportService.sendRequest(masterNode,
actionName, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
actionName, shardEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
listener.onSuccess();
@ -108,9 +106,9 @@ public class ShardStateAction extends AbstractComponent {
@Override
public void handleException(TransportException exp) {
if (isMasterChannelException(exp)) {
waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener);
waitForNewMasterAndRetry(actionName, observer, shardEntry, listener);
} else {
logger.warn("{} unexpected failure while sending request [{}] to [{}] for shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), actionName, masterNode, shardRoutingEntry);
logger.warn("{} unexpected failure while sending request [{}] to [{}] for shard entry [{}]", exp, shardEntry.shardId, actionName, masterNode, shardEntry);
listener.onFailure(exp instanceof RemoteTransportException ? (Exception) (exp.getCause() instanceof Exception ? exp.getCause() : new ElasticsearchException(exp.getCause())) : exp);
}
}
@ -129,34 +127,46 @@ public class ShardStateAction extends AbstractComponent {
}
/**
* Send a shard failed request to the master node to update the
* cluster state.
* @param shardRouting the shard to fail
* @param sourceShardRouting the source shard requesting the failure (must be the shard itself, or the primary shard)
* Send a shard failed request to the master node to update the cluster state with the failure of a shard on another node.
*
* @param shardRouting the shard to fail
* @param primaryTerm the primary term associated with the primary shard that is failing the shard.
* @param message the reason for the failure
* @param failure the underlying cause of the failure
* @param listener callback upon completion of the request
*/
public void shardFailed(final ShardRouting shardRouting, ShardRouting sourceShardRouting, final String message, @Nullable final Exception failure, Listener listener) {
public void remoteShardFailed(final ShardRouting shardRouting, long primaryTerm, final String message, @Nullable final Exception failure, Listener listener) {
assert primaryTerm > 0L : "primary term should be strictly positive";
shardFailed(shardRouting, primaryTerm, message, failure, listener);
}
/**
* Send a shard failed request to the master node to update the cluster state when a shard on the local node failed.
*/
public void localShardFailed(final ShardRouting shardRouting, final String message, @Nullable final Exception failure, Listener listener) {
shardFailed(shardRouting, 0L, message, failure, listener);
}
private void shardFailed(final ShardRouting shardRouting, long primaryTerm, final String message, @Nullable final Exception failure, Listener listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, sourceShardRouting, message, failure);
sendShardAction(SHARD_FAILED_ACTION_NAME, observer, shardRoutingEntry, listener);
ShardEntry shardEntry = new ShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message, failure);
sendShardAction(SHARD_FAILED_ACTION_NAME, observer, shardEntry, listener);
}
// visible for testing
protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardEntry shardEntry, Listener listener) {
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
if (logger.isTraceEnabled()) {
logger.trace("new cluster state [{}] after waiting for master election to fail shard [{}]", state.prettyPrint(), shardRoutingEntry);
logger.trace("new cluster state [{}] after waiting for master election to fail shard entry [{}]", state.prettyPrint(), shardEntry);
}
sendShardAction(actionName, observer, shardRoutingEntry, listener);
sendShardAction(actionName, observer, shardEntry, listener);
}
@Override
public void onClusterServiceClose() {
logger.warn("{} node closed while execution action [{}] for shard [{}]", shardRoutingEntry.failure, shardRoutingEntry.getShardRouting().shardId(), actionName, shardRoutingEntry.getShardRouting());
logger.warn("{} node closed while execution action [{}] for shard entry [{}]", shardEntry.failure, shardEntry.shardId, actionName, shardEntry);
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@ -168,7 +178,7 @@ public class ShardStateAction extends AbstractComponent {
}, MasterNodeChangePredicate.INSTANCE);
}
private static class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
private static class ShardFailedTransportHandler implements TransportRequestHandler<ShardEntry> {
private final ClusterService clusterService;
private final ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor;
private final ESLogger logger;
@ -180,8 +190,8 @@ public class ShardStateAction extends AbstractComponent {
}
@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
logger.warn("{} received shard failed for {}", request.failure, request.shardRouting.shardId(), request);
public void messageReceived(ShardEntry request, TransportChannel channel) throws Exception {
logger.warn("{} received shard failed for {}", request.failure, request.shardId, request);
clusterService.submitStateUpdateTask(
"shard-failed",
request,
@ -190,22 +200,22 @@ public class ShardStateAction extends AbstractComponent {
new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Exception e) {
logger.error("{} unexpected failure while failing shard [{}]", e, request.shardRouting.shardId(), request.shardRouting);
logger.error("{} unexpected failure while failing shard [{}]", e, request.shardId, request);
try {
channel.sendResponse(e);
} catch (Exception channelException) {
channelException.addSuppressed(e);
logger.warn("{} failed to send failure [{}] while failing shard [{}]", channelException, request.shardRouting.shardId(), e, request.shardRouting);
logger.warn("{} failed to send failure [{}] while failing shard [{}]", channelException, request.shardId, e, request);
}
}
@Override
public void onNoLongerMaster(String source) {
logger.error("{} no longer master while failing shard [{}]", request.shardRouting.shardId(), request.shardRouting);
logger.error("{} no longer master while failing shard [{}]", request.shardId, request);
try {
channel.sendResponse(new NotMasterException(source));
} catch (Exception channelException) {
logger.warn("{} failed to send no longer master while failing shard [{}]", channelException, request.shardRouting.shardId(), request.shardRouting);
logger.warn("{} failed to send no longer master while failing shard [{}]", channelException, request.shardId, request);
}
}
@ -214,7 +224,7 @@ public class ShardStateAction extends AbstractComponent {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (Exception channelException) {
logger.warn("{} failed to send response while failing shard [{}]", channelException, request.shardRouting.shardId(), request.shardRouting);
logger.warn("{} failed to send response while failing shard [{}]", channelException, request.shardId, request);
}
}
}
@ -222,63 +232,81 @@ public class ShardStateAction extends AbstractComponent {
}
}
static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor<ShardRoutingEntry> {
public static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor<ShardEntry> {
private final AllocationService allocationService;
private final RoutingService routingService;
private final ESLogger logger;
ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, ESLogger logger) {
public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, ESLogger logger) {
this.allocationService = allocationService;
this.routingService = routingService;
this.logger = logger;
}
@Override
public String describeTasks(List<ShardRoutingEntry> tasks) {
return tasks.stream().map(entry -> entry.getShardRouting().toString()).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
}
public BatchResult<ShardEntry> execute(ClusterState currentState, List<ShardEntry> tasks) throws Exception {
BatchResult.Builder<ShardEntry> batchResultBuilder = BatchResult.builder();
List<ShardEntry> tasksToBeApplied = new ArrayList<>();
List<FailedRerouteAllocation.FailedShard> shardRoutingsToBeApplied = new ArrayList<>();
Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates
@Override
public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception {
BatchResult.Builder<ShardRoutingEntry> batchResultBuilder = BatchResult.builder();
for (ShardEntry task : tasks) {
IndexMetaData indexMetaData = currentState.metaData().index(task.shardId.getIndex());
if (indexMetaData == null) {
// tasks that correspond to non-existent shards are marked as successful
logger.debug("{} ignoring shard failed task [{}] (unknown index {})", task.shardId, task, task.shardId.getIndex());
batchResultBuilder.success(task);
} else {
// non-local requests
if (task.primaryTerm > 0) {
long currentPrimaryTerm = indexMetaData.primaryTerm(task.shardId.id());
if (currentPrimaryTerm != task.primaryTerm) {
assert currentPrimaryTerm > task.primaryTerm : "received a primary term with a higher term than in the " +
"current cluster state (received [" + task.primaryTerm + "] but current is [" + currentPrimaryTerm + "])";
logger.debug("{} failing shard failed task [{}] (primary term {} does not match current term {})", task.shardId,
task, task.primaryTerm, indexMetaData.primaryTerm(task.shardId.id()));
batchResultBuilder.failure(task, new NoLongerPrimaryShardException(
task.shardId,
"primary term [" + task.primaryTerm + "] did not match current primary term [" + currentPrimaryTerm + "]"));
continue;
}
}
// partition tasks into those that correspond to shards
// that exist versus do not exist
Map<ValidationResult, List<ShardRoutingEntry>> partition =
tasks.stream().collect(Collectors.groupingBy(task -> validateTask(currentState, task)));
// tasks that correspond to non-existent shards are marked
// as successful
batchResultBuilder.successes(partition.getOrDefault(ValidationResult.SHARD_MISSING, Collections.emptyList()));
ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId);
if (matched == null) {
// tasks that correspond to non-existent shards are marked as successful
logger.debug("{} ignoring shard failed task [{}] (shard does not exist anymore)", task.shardId, task);
batchResultBuilder.success(task);
} else {
// remove duplicate actions as allocation service expects a clean list without duplicates
if (seenShardRoutings.contains(matched)) {
logger.trace("{} ignoring shard failed task [{}] (already scheduled to fail {})", task.shardId, task, matched);
tasksToBeApplied.add(task);
} else {
logger.debug("{} failing shard {} (shard failed task: [{}])", task.shardId, matched, task);
tasksToBeApplied.add(task);
shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(matched, task.message, task.failure));
seenShardRoutings.add(matched);
}
}
}
}
assert tasksToBeApplied.size() >= shardRoutingsToBeApplied.size();
ClusterState maybeUpdatedState = currentState;
List<ShardRoutingEntry> tasksToFail = partition.getOrDefault(ValidationResult.VALID, Collections.emptyList());
try {
List<FailedRerouteAllocation.FailedShard> failedShards =
tasksToFail
.stream()
.map(task -> new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure))
.collect(Collectors.toList());
RoutingAllocation.Result result = applyFailedShards(currentState, failedShards);
RoutingAllocation.Result result = applyFailedShards(currentState, shardRoutingsToBeApplied);
if (result.changed()) {
maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build();
}
batchResultBuilder.successes(tasksToFail);
batchResultBuilder.successes(tasksToBeApplied);
} catch (Exception e) {
logger.warn("failed to apply failed shards {}", e, shardRoutingsToBeApplied);
// failures are communicated back to the requester
// cluster state will not be updated in this case
batchResultBuilder.failures(tasksToFail, e);
batchResultBuilder.failures(tasksToBeApplied, e);
}
partition
.getOrDefault(ValidationResult.SOURCE_INVALID, Collections.emptyList())
.forEach(task -> batchResultBuilder.failure(
task,
new NoLongerPrimaryShardException(
task.getShardRouting().shardId(),
"source shard [" + task.sourceShardRouting + "] is neither the local allocation nor the primary allocation")
));
return batchResultBuilder.build(maybeUpdatedState);
}
@ -287,36 +315,6 @@ public class ShardStateAction extends AbstractComponent {
return allocationService.applyFailedShards(currentState, failedShards);
}
private enum ValidationResult {
VALID,
SOURCE_INVALID,
SHARD_MISSING
}
private ValidationResult validateTask(ClusterState currentState, ShardRoutingEntry task) {
// non-local requests
if (!task.shardRouting.isSameAllocation(task.sourceShardRouting)) {
IndexShardRoutingTable indexShard = currentState.getRoutingTable().shardRoutingTableOrNull(task.shardRouting.shardId());
if (indexShard == null) {
return ValidationResult.SOURCE_INVALID;
}
ShardRouting primaryShard = indexShard.primaryShard();
if (primaryShard == null || !primaryShard.isSameAllocation(task.sourceShardRouting)) {
return ValidationResult.SOURCE_INVALID;
}
}
RoutingNode routingNode = currentState.getRoutingNodes().node(task.getShardRouting().currentNodeId());
if (routingNode != null) {
ShardRouting maybe = routingNode.getByShardId(task.getShardRouting().shardId());
if (maybe != null && maybe.isSameAllocation(task.getShardRouting())) {
return ValidationResult.VALID;
}
}
return ValidationResult.SHARD_MISSING;
}
@Override
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
int numberOfUnassignedShards = clusterChangedEvent.state().getRoutingNodes().unassigned().size();
@ -332,11 +330,11 @@ public class ShardStateAction extends AbstractComponent {
public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, shardRouting, message, null);
sendShardAction(SHARD_STARTED_ACTION_NAME, observer, shardRoutingEntry, listener);
ShardEntry shardEntry = new ShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), 0L, message, null);
sendShardAction(SHARD_STARTED_ACTION_NAME, observer, shardEntry, listener);
}
private static class ShardStartedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
private static class ShardStartedTransportHandler implements TransportRequestHandler<ShardEntry> {
private final ClusterService clusterService;
private final ShardStartedClusterStateTaskExecutor shardStartedClusterStateTaskExecutor;
private final ESLogger logger;
@ -348,8 +346,8 @@ public class ShardStateAction extends AbstractComponent {
}
@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
logger.debug("{} received shard started for [{}]", request.shardRouting.shardId(), request);
public void messageReceived(ShardEntry request, TransportChannel channel) throws Exception {
logger.debug("{} received shard started for [{}]", request.shardId, request);
clusterService.submitStateUpdateTask(
"shard-started",
request,
@ -360,7 +358,7 @@ public class ShardStateAction extends AbstractComponent {
}
}
private static class ShardStartedClusterStateTaskExecutor implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
public static class ShardStartedClusterStateTaskExecutor implements ClusterStateTaskExecutor<ShardEntry>, ClusterStateTaskListener {
private final AllocationService allocationService;
private final ESLogger logger;
@ -370,17 +368,45 @@ public class ShardStateAction extends AbstractComponent {
}
@Override
public String describeTasks(List<ShardRoutingEntry> tasks) {
return tasks.stream().map(entry -> entry.getShardRouting().toString()).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
}
@Override
public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception {
BatchResult.Builder<ShardRoutingEntry> builder = BatchResult.builder();
public BatchResult<ShardEntry> execute(ClusterState currentState, List<ShardEntry> tasks) throws Exception {
BatchResult.Builder<ShardEntry> builder = BatchResult.builder();
List<ShardEntry> tasksToBeApplied = new ArrayList<>();
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
for (ShardRoutingEntry task : tasks) {
shardRoutingsToBeApplied.add(task.shardRouting);
Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates
for (ShardEntry task : tasks) {
assert task.primaryTerm == 0L : "shard is only started by itself: " + task;
ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId);
if (matched == null) {
// tasks that correspond to non-existent shards are marked as successful. The reason is that we resend shard started
// events on every cluster state publishing that does not contain the shard as started yet. This means that old stale
// requests might still be in flight even after the shard has already been started or failed on the master. We just
// ignore these requests for now.
logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", task.shardId, task);
builder.success(task);
} else {
if (matched.initializing() == false) {
assert matched.active() : "expected active shard routing for task " + task + " but found " + matched;
// same as above, this might have been a stale in-flight request, so we just ignore.
logger.debug("{} ignoring shard started task [{}] (shard exists but is not initializing: {})", task.shardId, task,
matched);
builder.success(task);
} else {
// remove duplicate actions as allocation service expects a clean list without duplicates
if (seenShardRoutings.contains(matched)) {
logger.trace("{} ignoring shard started task [{}] (already scheduled to start {})", task.shardId, task, matched);
tasksToBeApplied.add(task);
} else {
logger.debug("{} starting shard {} (shard started task: [{}])", task.shardId, matched, task);
tasksToBeApplied.add(task);
shardRoutingsToBeApplied.add(matched);
seenShardRoutings.add(matched);
}
}
}
}
assert tasksToBeApplied.size() >= shardRoutingsToBeApplied.size();
ClusterState maybeUpdatedState = currentState;
try {
RoutingAllocation.Result result =
@ -388,9 +414,10 @@ public class ShardStateAction extends AbstractComponent {
if (result.changed()) {
maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build();
}
builder.successes(tasks);
builder.successes(tasksToBeApplied);
} catch (Exception e) {
builder.failures(tasks, e);
logger.warn("failed to apply started shards {}", e, shardRoutingsToBeApplied);
builder.failures(tasksToBeApplied, e);
}
return builder.build(maybeUpdatedState);
@ -402,31 +429,38 @@ public class ShardStateAction extends AbstractComponent {
}
}
public static class ShardRoutingEntry extends TransportRequest {
ShardRouting shardRouting;
ShardRouting sourceShardRouting;
public static class ShardEntry extends TransportRequest {
ShardId shardId;
String allocationId;
long primaryTerm;
String message;
Exception failure;
public ShardRoutingEntry() {
public ShardEntry() {
}
ShardRoutingEntry(ShardRouting shardRouting, ShardRouting sourceShardRouting, String message, @Nullable Exception failure) {
this.shardRouting = shardRouting;
this.sourceShardRouting = sourceShardRouting;
public ShardEntry(ShardId shardId, String allocationId, long primaryTerm, String message, @Nullable Exception failure) {
this.shardId = shardId;
this.allocationId = allocationId;
this.primaryTerm = primaryTerm;
this.message = message;
this.failure = failure;
}
public ShardRouting getShardRouting() {
return shardRouting;
public ShardId getShardId() {
return shardId;
}
public String getAllocationId() {
return allocationId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardRouting = new ShardRouting(in);
sourceShardRouting = new ShardRouting(in);
shardId = ShardId.readShardId(in);
allocationId = in.readString();
primaryTerm = in.readVLong();
message = in.readString();
failure = in.readException();
}
@ -434,8 +468,9 @@ public class ShardStateAction extends AbstractComponent {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardRouting.writeTo(out);
sourceShardRouting.writeTo(out);
shardId.writeTo(out);
out.writeString(allocationId);
out.writeVLong(primaryTerm);
out.writeString(message);
out.writeException(failure);
}
@ -443,8 +478,9 @@ public class ShardStateAction extends AbstractComponent {
@Override
public String toString() {
List<String> components = new ArrayList<>(4);
components.add("target shard [" + shardRouting + "]");
components.add("source shard [" + sourceShardRouting + "]");
components.add("shard id [" + shardId + "]");
components.add("allocation id [" + allocationId + "]");
components.add("primary term [" + primaryTerm + "]");
components.add("message [" + message + "]");
if (failure != null) {
components.add("failure [" + ExceptionsHelper.detailedMessage(failure) + "]");

View File

@ -98,7 +98,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
}
if (shard.relocating()) {
// create the target initializing shard routing on the node the shard is relocating to
allInitializingShards.add(shard.buildTargetRelocatingShard());
allInitializingShards.add(shard.getTargetRelocatingShard());
}
if (shard.assignedToNode()) {
assignedShards.add(shard);

View File

@ -108,7 +108,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
k -> new LinkedHashMap<>()); // LinkedHashMap to preserve order
// add the counterpart shard with relocatingNodeId reflecting the source from which
// it's relocating from.
ShardRouting targetShardRouting = shard.buildTargetRelocatingShard();
ShardRouting targetShardRouting = shard.getTargetRelocatingShard();
addInitialRecovery(targetShardRouting, indexShard.primary);
previousValue = entries.put(targetShardRouting.shardId(), targetShardRouting);
if (previousValue != null) {
@ -276,6 +276,20 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return replicaSet == null ? EMPTY : Collections.unmodifiableList(replicaSet);
}
@Nullable
public ShardRouting getByAllocationId(ShardId shardId, String allocationId) {
final List<ShardRouting> replicaSet = assignedShards.get(shardId);
if (replicaSet == null) {
return null;
}
for (ShardRouting shardRouting : replicaSet) {
if (shardRouting.allocationId().getId().equals(allocationId)) {
return shardRouting;
}
}
return null;
}
/**
* Returns the active primary shard for the given shard id or <code>null</code> if
* no primary is found or the primary is not active.
@ -406,7 +420,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
ensureMutable();
relocatingShards++;
ShardRouting source = shard.relocate(nodeId, expectedShardSize);
ShardRouting target = source.buildTargetRelocatingShard();
ShardRouting target = source.getTargetRelocatingShard();
updateAssigned(shard, source);
node(target.currentNodeId()).add(target);
assignedShardsAdd(target);

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.Diffable;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -145,6 +146,26 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
.orElse(null);
}
@Nullable
public ShardRouting getByAllocationId(ShardId shardId, String allocationId) {
IndexShardRoutingTable shardRoutingTable = shardRoutingTableOrNull(shardId);
if (shardRoutingTable == null) {
return null;
}
for (ShardRouting shardRouting : shardRoutingTable.assignedShards()) {
if (shardRouting.allocationId().getId().equals(allocationId)) {
return shardRouting;
}
if (shardRouting.relocating()) {
if (shardRouting.getTargetRelocatingShard().allocationId().getId().equals(allocationId)) {
return shardRouting.getTargetRelocatingShard();
}
}
}
return null;
}
public boolean validate(MetaData metaData) {
for (IndexRoutingTable indexRoutingTable : this) {
if (indexRoutingTable.validate(metaData) == false) {
@ -245,7 +266,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
if (predicate.test(shardRouting)) {
set.add(shardRouting.shardsIt());
if (includeRelocationTargets && shardRouting.relocating()) {
set.add(new PlainShardIterator(shardRouting.shardId(), Collections.singletonList(shardRouting.buildTargetRelocatingShard())));
set.add(new PlainShardIterator(shardRouting.shardId(), Collections.singletonList(shardRouting.getTargetRelocatingShard())));
}
} else if (includeEmpty) { // we need this for counting properly, just make it an empty one
set.add(new PlainShardIterator(shardRouting.shardId(), Collections.<ShardRouting>emptyList()));
@ -278,7 +299,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
if (predicate.test(shardRouting)) {
shards.add(shardRouting);
if (includeRelocationTargets && shardRouting.relocating()) {
shards.add(shardRouting.buildTargetRelocatingShard());
shards.add(shardRouting.getTargetRelocatingShard());
}
}
}

View File

@ -56,6 +56,8 @@ public final class ShardRouting implements Writeable, ToXContent {
private final AllocationId allocationId;
private final transient List<ShardRouting> asList;
private final long expectedShardSize;
@Nullable
private final ShardRouting targetRelocatingShard;
/**
* A constructor to internally create shard routing instances, note, the internal flag should only be set to true
@ -74,11 +76,22 @@ public final class ShardRouting implements Writeable, ToXContent {
this.unassignedInfo = unassignedInfo;
this.allocationId = allocationId;
this.expectedShardSize = expectedShardSize;
this.targetRelocatingShard = initializeTargetRelocatingShard();
assert expectedShardSize == UNAVAILABLE_EXPECTED_SHARD_SIZE || state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.RELOCATING : expectedShardSize + " state: " + state;
assert expectedShardSize >= 0 || state != ShardRoutingState.INITIALIZING || state != ShardRoutingState.RELOCATING : expectedShardSize + " state: " + state;
assert !(state == ShardRoutingState.UNASSIGNED && unassignedInfo == null) : "unassigned shard must be created with meta";
}
@Nullable
private ShardRouting initializeTargetRelocatingShard() {
if (state == ShardRoutingState.RELOCATING) {
return new ShardRouting(shardId, relocatingNodeId, currentNodeId, restoreSource, primary,
ShardRoutingState.INITIALIZING, unassignedInfo, AllocationId.newTargetRelocation(allocationId), expectedShardSize);
} else {
return null;
}
}
/**
* Creates a new unassigned shard.
*/
@ -177,14 +190,13 @@ public final class ShardRouting implements Writeable, ToXContent {
}
/**
* Creates a shard routing representing the target shard.
* Returns a shard routing representing the target shard.
* The target shard routing will be the INITIALIZING state and have relocatingNodeId set to the
* source node.
*/
public ShardRouting buildTargetRelocatingShard() {
public ShardRouting getTargetRelocatingShard() {
assert relocating();
return new ShardRouting(shardId, relocatingNodeId, currentNodeId, restoreSource, primary, ShardRoutingState.INITIALIZING, unassignedInfo,
AllocationId.newTargetRelocation(allocationId), expectedShardSize);
return targetRelocatingShard;
}
/**
@ -282,6 +294,7 @@ public final class ShardRouting implements Writeable, ToXContent {
}
expectedShardSize = shardSize;
asList = Collections.singletonList(this);
targetRelocatingShard = initializeTargetRelocatingShard();
}
public ShardRouting(StreamInput in) throws IOException {
@ -453,7 +466,7 @@ public final class ShardRouting implements Writeable, ToXContent {
}
/**
* Returns <code>true</code> if this shard is a relocation target for another shard (i.e., was created with {@link #buildTargetRelocatingShard()}
* Returns <code>true</code> if this shard is a relocation target for another shard (i.e., was created with {@link #initializeTargetRelocatingShard()}
*/
public boolean isRelocationTarget() {
return state == ShardRoutingState.INITIALIZING && relocatingNodeId != null;

View File

@ -35,6 +35,7 @@ import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation.Result;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
@ -84,23 +85,25 @@ public class AllocationService extends AbstractComponent {
}
/**
* Applies the started shards. Note, shards can be called several times within this method.
* Applies the started shards. Note, only initializing ShardRouting instances that exist in the routing table should be
* provided as parameter and no duplicates should be contained.
* <p>
* If the same instance of the routing table is returned, then no change has been made.</p>
*/
public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards) {
public Result applyStartedShards(ClusterState clusterState, List<ShardRouting> startedShards) {
return applyStartedShards(clusterState, startedShards, true);
}
public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards, boolean withReroute) {
public Result applyStartedShards(ClusterState clusterState, List<ShardRouting> startedShards, boolean withReroute) {
if (startedShards.isEmpty()) {
return new Result(false, clusterState.routingTable(), clusterState.metaData());
}
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
StartedRerouteAllocation allocation = new StartedRerouteAllocation(allocationDeciders, routingNodes, clusterState, startedShards, clusterInfoService.getClusterInfo(), currentNanoTime());
boolean changed = applyStartedShards(allocation, startedShards);
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}
StartedRerouteAllocation allocation = new StartedRerouteAllocation(allocationDeciders, routingNodes, clusterState, startedShards,
clusterInfoService.getClusterInfo(), currentNanoTime());
applyStartedShards(allocation, startedShards);
gatewayAllocator.applyStartedShards(allocation);
if (withReroute) {
reroute(allocation);
@ -109,12 +112,12 @@ public class AllocationService extends AbstractComponent {
return buildResultAndLogHealthChange(allocation, "shards started [" + startedShardsAsString + "] ...");
}
protected RoutingAllocation.Result buildResultAndLogHealthChange(RoutingAllocation allocation, String reason) {
protected Result buildResultAndLogHealthChange(RoutingAllocation allocation, String reason) {
return buildResultAndLogHealthChange(allocation, reason, new RoutingExplanations());
}
protected RoutingAllocation.Result buildResultAndLogHealthChange(RoutingAllocation allocation, String reason, RoutingExplanations explanations) {
protected Result buildResultAndLogHealthChange(RoutingAllocation allocation, String reason, RoutingExplanations explanations) {
MetaData oldMetaData = allocation.metaData();
RoutingTable oldRoutingTable = allocation.routingTable();
RoutingNodes newRoutingNodes = allocation.routingNodes();
@ -128,7 +131,7 @@ public class AllocationService extends AbstractComponent {
metaData(newMetaData).routingTable(newRoutingTable).build()),
reason
);
return new RoutingAllocation.Result(true, newRoutingTable, newMetaData, explanations);
return new Result(true, newRoutingTable, newMetaData, explanations);
}
/**
@ -186,7 +189,7 @@ public class AllocationService extends AbstractComponent {
// we do not use newPrimary.isTargetRelocationOf(oldPrimary) because that one enforces newPrimary to
// be initializing. However, when the target shard is activated, we still want the primary term to staty
// the same
(oldPrimary.relocating() && newPrimary.isSameAllocation(oldPrimary.buildTargetRelocatingShard()))) {
(oldPrimary.relocating() && newPrimary.isSameAllocation(oldPrimary.getTargetRelocatingShard()))) {
// do nothing
} else {
// incrementing the primary term
@ -210,37 +213,44 @@ public class AllocationService extends AbstractComponent {
}
}
public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
public Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
return applyFailedShards(clusterState, Collections.singletonList(new FailedRerouteAllocation.FailedShard(failedShard, null, null)));
}
/**
* Applies the failed shards. Note, shards can be called several times within this method.
* Applies the failed shards. Note, only assigned ShardRouting instances that exist in the routing table should be
* provided as parameter and no duplicates should be contained.
*
* <p>
* If the same instance of the routing table is returned, then no change has been made.</p>
*/
public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List<FailedRerouteAllocation.FailedShard> failedShards) {
public Result applyFailedShards(ClusterState clusterState, List<FailedRerouteAllocation.FailedShard> failedShards) {
if (failedShards.isEmpty()) {
return new Result(false, clusterState.routingTable(), clusterState.metaData());
}
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
long currentNanoTime = currentNanoTime();
FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState, failedShards, clusterInfoService.getClusterInfo(), currentNanoTime);
boolean changed = false;
// as failing primaries also fail associated replicas, we fail replicas first here so that their nodes are added to ignore list
FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState, failedShards,
clusterInfoService.getClusterInfo(), currentNanoTime);
// as failing primaries also fail associated replicas, we fail replicas first here to avoid re-resolving replica ShardRouting
List<FailedRerouteAllocation.FailedShard> orderedFailedShards = new ArrayList<>(failedShards);
orderedFailedShards.sort(Comparator.comparing(failedShard -> failedShard.shard.primary()));
for (FailedRerouteAllocation.FailedShard failedShard : orderedFailedShards) {
UnassignedInfo unassignedInfo = failedShard.shard.unassignedInfo();
final int failedAllocations = unassignedInfo != null ? unassignedInfo.getNumFailedAllocations() : 0;
changed |= applyFailedShard(allocation, failedShard.shard, true, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShard.message, failedShard.failure,
failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT));
}
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
orderedFailedShards.sort(Comparator.comparing(failedShard -> failedShard.routingEntry.primary()));
for (FailedRerouteAllocation.FailedShard failedShardEntry : orderedFailedShards) {
ShardRouting failedShard = failedShardEntry.routingEntry;
final int failedAllocations = failedShard.unassignedInfo() != null ? failedShard.unassignedInfo().getNumFailedAllocations() : 0;
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShardEntry.message,
failedShardEntry.failure, failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false,
AllocationStatus.NO_ATTEMPT);
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
applyFailedShard(allocation, failedShard, unassignedInfo);
}
gatewayAllocator.applyFailedShards(allocation);
reroute(allocation);
String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString());
String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.routingEntry.shardId().toString());
return buildResultAndLogHealthChange(allocation, "shards failed [" + failedShardsAsString + "] ...");
}
@ -259,9 +269,9 @@ public class AllocationService extends AbstractComponent {
metaData.getIndexSafe(shardRouting.index()).getSettings());
if (newComputedLeftDelayNanos == 0) {
changed = true;
unassignedIterator.updateUnassignedInfo(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(),
unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), unassignedInfo.getUnassignedTimeInMillis(), false,
unassignedInfo.getLastAllocationStatus()));
unassignedIterator.updateUnassignedInfo(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(),
unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus()));
}
}
}
@ -285,7 +295,7 @@ public class AllocationService extends AbstractComponent {
.collect(Collectors.joining(", "));
}
public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands, boolean explain, boolean retryFailed) {
public Result reroute(ClusterState clusterState, AllocationCommands commands, boolean explain, boolean retryFailed) {
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// we don't shuffle the unassigned shards here, to try and get as close as possible to
// a consistent result of the effect the commands have on the routing
@ -311,7 +321,7 @@ public class AllocationService extends AbstractComponent {
* <p>
* If the same instance of the routing table is returned, then no change has been made.
*/
public RoutingAllocation.Result reroute(ClusterState clusterState, String reason) {
public Result reroute(ClusterState clusterState, String reason) {
return reroute(clusterState, reason, false);
}
@ -320,7 +330,7 @@ public class AllocationService extends AbstractComponent {
* <p>
* If the same instance of the routing table is returned, then no change has been made.
*/
protected RoutingAllocation.Result reroute(ClusterState clusterState, String reason, boolean debug) {
protected Result reroute(ClusterState clusterState, String reason, boolean debug) {
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
@ -328,7 +338,7 @@ public class AllocationService extends AbstractComponent {
clusterInfoService.getClusterInfo(), currentNanoTime(), false);
allocation.debugDecision(debug);
if (!reroute(allocation)) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
return new Result(false, clusterState.routingTable(), clusterState.metaData());
}
return buildResultAndLogHealthChange(allocation, reason);
}
@ -420,7 +430,7 @@ public class AllocationService extends AbstractComponent {
boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0;
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT);
applyFailedShard(allocation, shardRouting, false, unassignedInfo);
applyFailedShard(allocation, shardRouting, unassignedInfo);
}
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
// since it relies on the fact that the RoutingNode exists in the list of nodes
@ -429,111 +439,70 @@ public class AllocationService extends AbstractComponent {
return changed;
}
private boolean failReplicasForUnassignedPrimary(RoutingAllocation allocation, ShardRouting primary) {
private boolean failReplicasForUnassignedPrimary(RoutingAllocation allocation, ShardRouting failedPrimary) {
assert failedPrimary.primary() : "can only fail replicas for primary shard: " + failedPrimary;
List<ShardRouting> replicas = new ArrayList<>();
for (ShardRouting routing : allocation.routingNodes().assignedShards(primary.shardId())) {
for (ShardRouting routing : allocation.routingNodes().assignedShards(failedPrimary.shardId())) {
if (!routing.primary() && routing.initializing()) {
replicas.add(routing);
}
}
boolean changed = false;
for (ShardRouting routing : replicas) {
changed |= applyFailedShard(allocation, routing, false,
new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED, "primary failed while replica initializing",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false,
AllocationStatus.NO_ATTEMPT));
for (ShardRouting failedReplica : replicas) {
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED,
"primary failed while replica initializing", null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false,
AllocationStatus.NO_ATTEMPT);
applyFailedShard(allocation, failedReplica, unassignedInfo);
}
return changed;
return replicas.isEmpty() == false;
}
private boolean applyStartedShards(RoutingAllocation routingAllocation, Iterable<? extends ShardRouting> startedShardEntries) {
boolean dirty = false;
// apply shards might be called several times with the same shard, ignore it
private void applyStartedShards(RoutingAllocation routingAllocation, List<ShardRouting> startedShardEntries) {
assert startedShardEntries.isEmpty() == false : "non-empty list of started shard entries expected";
RoutingNodes routingNodes = routingAllocation.routingNodes();
for (ShardRouting startedShard : startedShardEntries) {
assert startedShard.initializing();
assert startedShard.initializing() : "only initializing shards can be started";
assert routingAllocation.metaData().index(startedShard.shardId().getIndex()) != null :
"shard started for unknown index (shard entry: " + startedShard + ")";
assert startedShard == routingNodes.getByAllocationId(startedShard.shardId(), startedShard.allocationId().getId()) :
"shard routing to start does not exist in routing table, expected: " + startedShard + " but was: " +
routingNodes.getByAllocationId(startedShard.shardId(), startedShard.allocationId().getId());
// validate index still exists. strictly speaking this is not needed but it gives clearer logs
if (routingAllocation.metaData().index(startedShard.index()) == null) {
logger.debug("{} ignoring shard started, unknown index (routing: {})", startedShard.shardId(), startedShard);
continue;
}
routingNodes.started(startedShard);
logger.trace("{} marked shard as started (routing: {})", startedShard.shardId(), startedShard);
RoutingNode currentRoutingNode = routingNodes.node(startedShard.currentNodeId());
if (currentRoutingNode == null) {
logger.debug("{} failed to find shard in order to start it [failed to find node], ignoring (routing: {})", startedShard.shardId(), startedShard);
continue;
}
ShardRouting matchingShard = currentRoutingNode.getByShardId(startedShard.shardId());
if (matchingShard == null) {
logger.debug("{} failed to find shard in order to start it [failed to find shard], ignoring (routing: {})", startedShard.shardId(), startedShard);
} else if (matchingShard.isSameAllocation(startedShard) == false) {
logger.debug("{} failed to find shard with matching allocation id in order to start it [failed to find matching shard], ignoring (routing: {}, matched shard routing: {})", startedShard.shardId(), startedShard, matchingShard);
} else {
startedShard = matchingShard;
if (startedShard.active()) {
logger.trace("{} shard is already started, ignoring (routing: {})", startedShard.shardId(), startedShard);
} else {
assert startedShard.initializing();
dirty = true;
routingNodes.started(startedShard);
logger.trace("{} marked shard as started (routing: {})", startedShard.shardId(), startedShard);
if (startedShard.relocatingNodeId() != null) {
// relocation target has been started, remove relocation source
RoutingNode relocationSourceNode = routingNodes.node(startedShard.relocatingNodeId());
ShardRouting relocationSourceShard = relocationSourceNode.getByShardId(startedShard.shardId());
assert relocationSourceShard.isRelocationSourceOf(startedShard);
routingNodes.remove(relocationSourceShard);
}
}
if (startedShard.relocatingNodeId() != null) {
// relocation target has been started, remove relocation source
RoutingNode relocationSourceNode = routingNodes.node(startedShard.relocatingNodeId());
ShardRouting relocationSourceShard = relocationSourceNode.getByShardId(startedShard.shardId());
assert relocationSourceShard.isRelocationSourceOf(startedShard);
assert relocationSourceShard.getTargetRelocatingShard() == startedShard : "relocation target mismatch, expected: "
+ startedShard + " but was: " + relocationSourceShard.getTargetRelocatingShard();
routingNodes.remove(relocationSourceShard);
}
}
return dirty;
}
/**
* Applies the relevant logic to handle a failed shard. Returns <tt>true</tt> if changes happened that
* require relocation.
* Applies the relevant logic to handle a failed shard.
*/
private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting failedShard, boolean addToIgnoreList, UnassignedInfo unassignedInfo) {
IndexRoutingTable indexRoutingTable = allocation.routingTable().index(failedShard.index());
if (indexRoutingTable == null) {
logger.debug("{} ignoring shard failure, unknown index in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
return false;
}
private void applyFailedShard(RoutingAllocation allocation, ShardRouting failedShard, UnassignedInfo unassignedInfo) {
RoutingNodes routingNodes = allocation.routingNodes();
assert failedShard.assignedToNode() : "only assigned shards can be failed";
assert allocation.metaData().index(failedShard.shardId().getIndex()) != null :
"shard failed for unknown index (shard entry: " + failedShard + ")";
assert routingNodes.getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId()) == failedShard :
"shard routing to fail does not exist in routing table, expected: " + failedShard + " but was: " +
routingNodes.getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId());
RoutingNode matchedNode = routingNodes.node(failedShard.currentNodeId());
if (matchedNode == null) {
logger.debug("{} ignoring shard failure, unknown node in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
return false;
}
ShardRouting matchedShard = matchedNode.getByShardId(failedShard.shardId());
if (matchedShard != null && matchedShard.isSameAllocation(failedShard)) {
logger.debug("{} failed shard {} found in routingNodes, failing it ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
// replace incoming instance to make sure we work on the latest one
failedShard = matchedShard;
} else {
logger.debug("{} ignoring shard failure, unknown allocation id in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
return false;
}
logger.debug("{} failing shard {} with unassigned info ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
if (failedShard.primary()) {
// fail replicas first otherwise we move RoutingNodes into an inconsistent state
failReplicasForUnassignedPrimary(allocation, failedShard);
}
if (addToIgnoreList) {
// make sure we ignore this shard on the relevant node
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
}
cancelShard(logger, failedShard, unassignedInfo, routingNodes);
assert matchedNode.getByShardId(failedShard.shardId()) == null : "failedShard " + failedShard + " was matched but wasn't removed";
return true;
assert routingNodes.node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) == null : "failedShard " + failedShard +
" was matched but wasn't removed";
}
public static void cancelShard(ESLogger logger, ShardRouting cancelledShard, UnassignedInfo unassignedInfo, RoutingNodes routingNodes) {
@ -544,11 +513,13 @@ public class AllocationService extends AbstractComponent {
// The shard is a target of a relocating shard. In that case we only
// need to remove the target shard and cancel the source relocation.
// No shard is left unassigned
logger.trace("{} is a relocation target, resolving source to cancel relocation ({})", cancelledShard, unassignedInfo.shortSummary());
logger.trace("{} is a relocation target, resolving source to cancel relocation ({})", cancelledShard,
unassignedInfo.shortSummary());
RoutingNode sourceNode = routingNodes.node(cancelledShard.relocatingNodeId());
ShardRouting sourceShard = sourceNode.getByShardId(cancelledShard.shardId());
assert sourceShard.isRelocationSourceOf(cancelledShard);
logger.trace("{}, resolved source to [{}]. canceling relocation ... ({})", cancelledShard.shardId(), sourceShard, unassignedInfo.shortSummary());
logger.trace("{}, resolved source to [{}]. canceling relocation ... ({})", cancelledShard.shardId(), sourceShard,
unassignedInfo.shortSummary());
routingNodes.cancelRelocation(sourceShard);
routingNodes.remove(cancelledShard);
} else {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.index.shard.ShardId;
import java.util.List;
@ -39,25 +40,28 @@ public class FailedRerouteAllocation extends RoutingAllocation {
* details on why it failed.
*/
public static class FailedShard {
public final ShardRouting shard;
public final ShardRouting routingEntry;
public final String message;
public final Exception failure;
public FailedShard(ShardRouting shard, String message, Exception failure) {
this.shard = shard;
public FailedShard(ShardRouting routingEntry, String message, Exception failure) {
assert routingEntry.assignedToNode() : "only assigned shards can be failed " + routingEntry;
this.routingEntry = routingEntry;
this.message = message;
this.failure = failure;
}
@Override
public String toString() {
return "failed shard, shard " + shard + ", message [" + message + "], failure [" + ExceptionsHelper.detailedMessage(failure) + "]";
return "failed shard, shard " + routingEntry + ", message [" + message + "], failure [" +
ExceptionsHelper.detailedMessage(failure) + "]";
}
}
private final List<FailedShard> failedShards;
public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, List<FailedShard> failedShards, ClusterInfo clusterInfo, long currentNanoTime) {
public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState,
List<FailedShard> failedShards, ClusterInfo clusterInfo, long currentNanoTime) {
super(deciders, routingNodes, clusterState, clusterInfo, currentNanoTime, false);
this.failedShards = failedShards;
}

View File

@ -150,7 +150,8 @@ public class RoutingAllocation {
* @param clusterState cluster state before rerouting
* @param currentNanoTime the nano time to use for all delay allocation calculation (typically {@link System#nanoTime()})
*/
public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, ClusterInfo clusterInfo, long currentNanoTime, boolean retryFailed) {
public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, ClusterInfo clusterInfo,
long currentNanoTime, boolean retryFailed) {
this.deciders = deciders;
this.routingNodes = routingNodes;
this.metaData = clusterState.metaData();

View File

@ -33,9 +33,10 @@ import java.util.List;
*/
public class StartedRerouteAllocation extends RoutingAllocation {
private final List<? extends ShardRouting> startedShards;
private final List<ShardRouting> startedShards;
public StartedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, List<? extends ShardRouting> startedShards, ClusterInfo clusterInfo, long currentNanoTime) {
public StartedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState,
List<ShardRouting> startedShards, ClusterInfo clusterInfo, long currentNanoTime) {
super(deciders, routingNodes, clusterState, clusterInfo, currentNanoTime, false);
this.startedShards = startedShards;
}
@ -44,7 +45,7 @@ public class StartedRerouteAllocation extends RoutingAllocation {
* Get started shards
* @return list of started shards
*/
public List<? extends ShardRouting> startedShards() {
public List<ShardRouting> startedShards() {
return startedShards;
}
}

View File

@ -188,11 +188,11 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
} else if (shardRouting.relocating()) {
initializingShard = shardRouting.cancelRelocation()
.relocate(currentNodeId, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)
.buildTargetRelocatingShard();
.getTargetRelocatingShard();
} else {
assert shardRouting.started();
initializingShard = shardRouting.relocate(currentNodeId, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)
.buildTargetRelocatingShard();
.getTargetRelocatingShard();
}
assert initializingShard.initializing();
return initializingShard;

View File

@ -124,8 +124,8 @@ public class GatewayAllocator extends AbstractComponent {
public void applyFailedShards(FailedRerouteAllocation allocation) {
for (FailedRerouteAllocation.FailedShard shard : allocation.failedShards()) {
Releasables.close(asyncFetchStarted.remove(shard.shard.shardId()));
Releasables.close(asyncFetchStore.remove(shard.shard.shardId()));
Releasables.close(asyncFetchStarted.remove(shard.routingEntry.shardId()));
Releasables.close(asyncFetchStore.remove(shard.routingEntry.shardId()));
}
}

View File

@ -216,7 +216,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
if (masterNode != null) { // TODO: can we remove this? Is resending shard failures the responsibility of shardStateAction?
String message = "master " + masterNode + " has not removed previously failed shard. resending shard failure";
logger.trace("[{}] re-sending failed shard [{}], reason [{}]", matchedRouting.shardId(), matchedRouting, message);
shardStateAction.shardFailed(matchedRouting, matchedRouting, message, null, SHARD_STATE_ACTION_LISTENER);
shardStateAction.localShardFailed(matchedRouting, message, null, SHARD_STATE_ACTION_LISTENER);
}
}
}
@ -686,7 +686,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
try {
logger.warn("[{}] marking and sending shard failed due to [{}]", failure, shardRouting.shardId(), message);
failedShardsCache.put(shardRouting.shardId(), shardRouting);
shardStateAction.shardFailed(shardRouting, shardRouting, message, failure, SHARD_STATE_ACTION_LISTENER);
shardStateAction.localShardFailed(shardRouting, message, failure, SHARD_STATE_ACTION_LISTENER);
} catch (Exception inner) {
if (failure != null) inner.addSuppressed(failure);
logger.warn(

View File

@ -76,7 +76,7 @@ public class ReplicationOperationTests extends ESTestCase {
// simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated
state = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build();
primaryShard = primaryShard.buildTargetRelocatingShard();
primaryShard = primaryShard.getTargetRelocatingShard();
}
final Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, state);
@ -161,7 +161,7 @@ public class ReplicationOperationTests extends ESTestCase {
// simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated
state = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build();
primaryShard = primaryShard.buildTargetRelocatingShard();
primaryShard = primaryShard.getTargetRelocatingShard();
}
final Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, state);
@ -175,7 +175,7 @@ public class ReplicationOperationTests extends ESTestCase {
final ClusterState finalState = state;
final TestReplicaProxy replicasProxy = new TestReplicaProxy(expectedFailures) {
@Override
public void failShard(ShardRouting replica, ShardRouting primary, String message, Exception exception,
public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted,
Consumer<Exception> onIgnoredFailure) {
assertThat(replica, equalTo(failedReplica));
@ -311,7 +311,7 @@ public class ReplicationOperationTests extends ESTestCase {
}
if (shardRouting.relocating() && localNodeId.equals(shardRouting.relocatingNodeId()) == false) {
expectedReplicas.add(shardRouting.buildTargetRelocatingShard());
expectedReplicas.add(shardRouting.getTargetRelocatingShard());
}
}
}
@ -422,7 +422,7 @@ public class ReplicationOperationTests extends ESTestCase {
}
@Override
public void failShard(ShardRouting replica, ShardRouting primary, String message, Exception exception, Runnable onSuccess,
public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
if (failedReplicas.add(replica) == false) {
fail("replica [" + replica + "] was failed twice");

View File

@ -534,15 +534,16 @@ public class TransportReplicationActionTests extends ESTestCase {
AtomicReference<Throwable> failure = new AtomicReference<>();
AtomicReference<Throwable> ignoredFailure = new AtomicReference<>();
AtomicBoolean success = new AtomicBoolean();
proxy.failShard(replica, shardRoutings.primaryShard(), "test", new ElasticsearchException("simulated"),
proxy.failShard(replica, randomIntBetween(0, 10), "test", new ElasticsearchException("simulated"),
() -> success.set(true), failure::set, ignoredFailure::set
);
CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear();
assertEquals(1, shardFailedRequests.length);
CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0];
ShardStateAction.ShardRoutingEntry shardRoutingEntry = (ShardStateAction.ShardRoutingEntry) shardFailedRequest.request;
ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) shardFailedRequest.request;
// the shard the request was sent to and the shard to be failed should be the same
assertEquals(shardRoutingEntry.getShardRouting(), replica);
assertEquals(shardEntry.getShardId(), replica.shardId());
assertEquals(shardEntry.getAllocationId(), replica.allocationId().getId());
if (randomBoolean()) {
// simulate success
transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE);
@ -553,7 +554,7 @@ public class TransportReplicationActionTests extends ESTestCase {
} else if (randomBoolean()) {
// simulate the primary has been demoted
transport.handleRemoteError(shardFailedRequest.requestId,
new ShardStateAction.NoLongerPrimaryShardException(shardRoutingEntry.getShardRouting().shardId(),
new ShardStateAction.NoLongerPrimaryShardException(replica.shardId(),
"shard-failed-test"));
assertFalse(success.get());
assertNotNull(failure.get());

View File

@ -29,7 +29,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
@ -51,7 +50,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -79,7 +77,8 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
.build());
numberOfReplicas = randomIntBetween(2, 16);
metaData = MetaData.builder()
.put(IndexMetaData.builder(INDEX).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(numberOfReplicas))
.put(IndexMetaData.builder(INDEX).settings(settings(Version.CURRENT))
.numberOfShards(1).numberOfReplicas(numberOfReplicas).primaryTerm(0, randomIntBetween(2, 10)))
.build();
routingTable = RoutingTable.builder()
.addAsNew(metaData.index(INDEX))
@ -89,8 +88,8 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
}
public void testEmptyTaskListProducesSameClusterState() throws Exception {
List<ShardStateAction.ShardRoutingEntry> tasks = Collections.emptyList();
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardRoutingEntry> result =
List<ShardStateAction.ShardEntry> tasks = Collections.emptyList();
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardEntry> result =
executor.execute(clusterState, tasks);
assertTasksSuccessful(tasks, result, clusterState, false);
}
@ -98,35 +97,35 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
public void testDuplicateFailuresAreOkay() throws Exception {
String reason = "test duplicate failures are okay";
ClusterState currentState = createClusterStateWithStartedShards(reason);
List<ShardStateAction.ShardRoutingEntry> tasks = createExistingShards(currentState, reason);
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardRoutingEntry> result = executor.execute(currentState, tasks);
List<ShardStateAction.ShardEntry> tasks = createExistingShards(currentState, reason);
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardEntry> result = executor.execute(currentState, tasks);
assertTasksSuccessful(tasks, result, clusterState, true);
}
public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception {
String reason = "test non existent shards are marked as successful";
ClusterState currentState = createClusterStateWithStartedShards(reason);
List<ShardStateAction.ShardRoutingEntry> tasks = createNonExistentShards(currentState, reason);
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardRoutingEntry> result = executor.execute(clusterState, tasks);
List<ShardStateAction.ShardEntry> tasks = createNonExistentShards(currentState, reason);
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardEntry> result = executor.execute(clusterState, tasks);
assertTasksSuccessful(tasks, result, clusterState, false);
}
public void testTriviallySuccessfulTasksBatchedWithFailingTasks() throws Exception {
String reason = "test trivially successful tasks batched with failing tasks";
ClusterState currentState = createClusterStateWithStartedShards(reason);
List<ShardStateAction.ShardRoutingEntry> failingTasks = createExistingShards(currentState, reason);
List<ShardStateAction.ShardRoutingEntry> nonExistentTasks = createNonExistentShards(currentState, reason);
List<ShardStateAction.ShardEntry> failingTasks = createExistingShards(currentState, reason);
List<ShardStateAction.ShardEntry> nonExistentTasks = createNonExistentShards(currentState, reason);
ShardStateAction.ShardFailedClusterStateTaskExecutor failingExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger) {
@Override
RoutingAllocation.Result applyFailedShards(ClusterState currentState, List<FailedRerouteAllocation.FailedShard> failedShards) {
throw new RuntimeException("simulated applyFailedShards failure");
}
};
List<ShardStateAction.ShardRoutingEntry> tasks = new ArrayList<>();
List<ShardStateAction.ShardEntry> tasks = new ArrayList<>();
tasks.addAll(failingTasks);
tasks.addAll(nonExistentTasks);
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardRoutingEntry> result = failingExecutor.execute(currentState, tasks);
Map<ShardStateAction.ShardRoutingEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardEntry> result = failingExecutor.execute(currentState, tasks);
Map<ShardStateAction.ShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
failingTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure"))));
taskResultMap.putAll(nonExistentTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success())));
assertTaskResults(taskResultMap, result, currentState, false);
@ -135,16 +134,20 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
public void testIllegalShardFailureRequests() throws Exception {
String reason = "test illegal shard failure requests";
ClusterState currentState = createClusterStateWithStartedShards(reason);
List<ShardStateAction.ShardRoutingEntry> failingTasks = createExistingShards(currentState, reason);
List<ShardStateAction.ShardRoutingEntry> tasks = new ArrayList<>();
for (ShardStateAction.ShardRoutingEntry failingTask : failingTasks) {
tasks.add(new ShardStateAction.ShardRoutingEntry(failingTask.getShardRouting(), randomInvalidSourceShard(currentState, failingTask.getShardRouting()), failingTask.message, failingTask.failure));
List<ShardStateAction.ShardEntry> failingTasks = createExistingShards(currentState, reason);
List<ShardStateAction.ShardEntry> tasks = new ArrayList<>();
for (ShardStateAction.ShardEntry failingTask : failingTasks) {
long primaryTerm = currentState.metaData().index(failingTask.shardId.getIndex()).primaryTerm(failingTask.shardId.id());
tasks.add(new ShardStateAction.ShardEntry(failingTask.shardId, failingTask.allocationId,
randomIntBetween(1, (int) primaryTerm - 1), failingTask.message, failingTask.failure));
}
Map<ShardStateAction.ShardRoutingEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
Map<ShardStateAction.ShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
tasks.stream().collect(Collectors.toMap(
Function.identity(),
task -> ClusterStateTaskExecutor.TaskResult.failure(new ShardStateAction.NoLongerPrimaryShardException(task.getShardRouting().shardId(), "source shard [" + task.sourceShardRouting + "] is neither the local allocation nor the primary allocation"))));
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardRoutingEntry> result = executor.execute(currentState, tasks);
task -> ClusterStateTaskExecutor.TaskResult.failure(new ShardStateAction.NoLongerPrimaryShardException(task.shardId,
"primary term [" + task.primaryTerm + "] did not match current primary term [" +
currentState.metaData().index(task.shardId.getIndex()).primaryTerm(task.shardId.id()) + "]"))));
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardEntry> result = executor.execute(currentState, tasks);
assertTaskResults(taskResultMap, result, currentState, false);
}
@ -163,7 +166,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
return ClusterState.builder(stateAfterReroute).routingTable(afterStart).build();
}
private List<ShardStateAction.ShardRoutingEntry> createExistingShards(ClusterState currentState, String reason) {
private List<ShardStateAction.ShardEntry> createExistingShards(ClusterState currentState, String reason) {
List<ShardRouting> shards = new ArrayList<>();
GroupShardsIterator shardGroups =
currentState.routingTable().allAssignedShardsGrouped(new String[] { INDEX }, true);
@ -182,7 +185,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
return toTasks(currentState, shardsToFail, indexUUID, reason);
}
private List<ShardStateAction.ShardRoutingEntry> createNonExistentShards(ClusterState currentState, String reason) {
private List<ShardStateAction.ShardEntry> createNonExistentShards(ClusterState currentState, String reason) {
// add shards from a non-existent index
String nonExistentIndexUUID = "non-existent";
Index index = new Index("non-existent", nonExistentIndexUUID);
@ -196,17 +199,14 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
nonExistentShards.add(nonExistentShardRouting(index, nodeIds, false));
}
List<ShardStateAction.ShardRoutingEntry> existingShards = createExistingShards(currentState, reason);
List<ShardStateAction.ShardRoutingEntry> shardsWithMismatchedAllocationIds = new ArrayList<>();
for (ShardStateAction.ShardRoutingEntry existingShard : existingShards) {
ShardRouting sr = existingShard.getShardRouting();
ShardRouting nonExistentShardRouting =
TestShardRouting.newShardRouting(sr.shardId(), sr.currentNodeId(), sr.relocatingNodeId(), sr.restoreSource(), sr.primary(), sr.state());
shardsWithMismatchedAllocationIds.add(new ShardStateAction.ShardRoutingEntry(nonExistentShardRouting, nonExistentShardRouting, existingShard.message, existingShard.failure));
List<ShardStateAction.ShardEntry> existingShards = createExistingShards(currentState, reason);
List<ShardStateAction.ShardEntry> shardsWithMismatchedAllocationIds = new ArrayList<>();
for (ShardStateAction.ShardEntry existingShard : existingShards) {
shardsWithMismatchedAllocationIds.add(new ShardStateAction.ShardEntry(existingShard.shardId, UUIDs.randomBase64UUID(), 0L, existingShard.message, existingShard.failure));
}
List<ShardStateAction.ShardRoutingEntry> tasks = new ArrayList<>();
nonExistentShards.forEach(shard -> tasks.add(new ShardStateAction.ShardRoutingEntry(shard, shard, reason, new CorruptIndexException("simulated", nonExistentIndexUUID))));
List<ShardStateAction.ShardEntry> tasks = new ArrayList<>();
nonExistentShards.forEach(shard -> tasks.add(new ShardStateAction.ShardEntry(shard.shardId(), shard.allocationId().getId(), 0L, reason, new CorruptIndexException("simulated", nonExistentIndexUUID))));
tasks.addAll(shardsWithMismatchedAllocationIds);
return tasks;
}
@ -216,41 +216,42 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
}
private static void assertTasksSuccessful(
List<ShardStateAction.ShardRoutingEntry> tasks,
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardRoutingEntry> result,
List<ShardStateAction.ShardEntry> tasks,
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardEntry> result,
ClusterState clusterState,
boolean clusterStateChanged
) {
Map<ShardStateAction.ShardRoutingEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
Map<ShardStateAction.ShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
tasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success()));
assertTaskResults(taskResultMap, result, clusterState, clusterStateChanged);
}
private static void assertTaskResults(
Map<ShardStateAction.ShardRoutingEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap,
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardRoutingEntry> result,
Map<ShardStateAction.ShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap,
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardEntry> result,
ClusterState clusterState,
boolean clusterStateChanged
) {
// there should be as many task results as tasks
assertEquals(taskResultMap.size(), result.executionResults.size());
for (Map.Entry<ShardStateAction.ShardRoutingEntry, ClusterStateTaskExecutor.TaskResult> entry : taskResultMap.entrySet()) {
for (Map.Entry<ShardStateAction.ShardEntry, ClusterStateTaskExecutor.TaskResult> entry : taskResultMap.entrySet()) {
// every task should have a corresponding task result
assertTrue(result.executionResults.containsKey(entry.getKey()));
// the task results are as expected
assertEquals(entry.getValue().isSuccess(), result.executionResults.get(entry.getKey()).isSuccess());
assertEquals(entry.getKey().toString(), entry.getValue().isSuccess(), result.executionResults.get(entry.getKey()).isSuccess());
}
List<ShardRouting> shards = clusterState.getRoutingTable().allShards();
for (Map.Entry<ShardStateAction.ShardRoutingEntry, ClusterStateTaskExecutor.TaskResult> entry : taskResultMap.entrySet()) {
for (Map.Entry<ShardStateAction.ShardEntry, ClusterStateTaskExecutor.TaskResult> entry : taskResultMap.entrySet()) {
if (entry.getValue().isSuccess()) {
// the shard was successfully failed and so should not
// be in the routing table
// the shard was successfully failed and so should not be in the routing table
for (ShardRouting shard : shards) {
if (entry.getKey().getShardRouting().allocationId() != null) {
assertThat(shard.allocationId(), not(equalTo(entry.getKey().getShardRouting().allocationId())));
if (shard.assignedToNode()) {
assertFalse("entry key " + entry.getKey() + ", shard routing " + shard,
entry.getKey().getShardId().equals(shard.shardId()) &&
entry.getKey().getAllocationId().equals(shard.allocationId().getId()));
}
}
} else {
@ -268,50 +269,15 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
}
}
private static List<ShardStateAction.ShardRoutingEntry> toTasks(ClusterState currentState, List<ShardRouting> shards, String indexUUID, String message) {
private static List<ShardStateAction.ShardEntry> toTasks(ClusterState currentState, List<ShardRouting> shards, String indexUUID, String message) {
return shards
.stream()
.map(shard -> new ShardStateAction.ShardRoutingEntry(shard, randomValidSourceShard(currentState, shard), message, new CorruptIndexException("simulated", indexUUID)))
.map(shard -> new ShardStateAction.ShardEntry(
shard.shardId(),
shard.allocationId().getId(),
randomBoolean() ? 0L : currentState.metaData().getIndexSafe(shard.index()).primaryTerm(shard.id()),
message,
new CorruptIndexException("simulated", indexUUID)))
.collect(Collectors.toList());
}
private static ShardRouting randomValidSourceShard(ClusterState currentState, ShardRouting shardRouting) {
// for the request node ID to be valid, either the request is
// from the node the shard is assigned to, or the request is
// from the node holding the primary shard
if (randomBoolean()) {
// request from local node
return shardRouting;
} else {
// request from primary node unless in the case of
// non-existent shards there is not one and we fallback to
// the local node
ShardRouting primaryNodeId = primaryShard(currentState, shardRouting);
return primaryNodeId != null ? primaryNodeId : shardRouting;
}
}
private static ShardRouting randomInvalidSourceShard(ClusterState currentState, ShardRouting shardRouting) {
ShardRouting primaryShard = primaryShard(currentState, shardRouting);
Set<ShardRouting> shards =
currentState
.routingTable()
.allShards()
.stream()
.filter(shard -> !shard.isSameAllocation(shardRouting))
.filter(shard -> !shard.isSameAllocation(primaryShard))
.collect(Collectors.toSet());
if (!shards.isEmpty()) {
return randomSubsetOf(1, shards.toArray(new ShardRouting[0])).get(0);
} else {
return
TestShardRouting.newShardRouting(shardRouting.shardId(), UUIDs.randomBase64UUID(random()), randomBoolean(),
randomFrom(ShardRoutingState.values()));
}
}
private static ShardRouting primaryShard(ClusterState currentState, ShardRouting shardRouting) {
IndexShardRoutingTable indexShard = currentState.getRoutingTable().shardRoutingTableOrNull(shardRouting.shardId());
return indexShard == null ? null : indexShard.primaryShard();
}
}

View File

@ -29,9 +29,7 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
@ -61,6 +59,7 @@ import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
public class ShardStateActionTests extends ESTestCase {
@ -89,9 +88,9 @@ public class ShardStateActionTests extends ESTestCase {
}
@Override
protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardEntry shardEntry, Listener listener) {
onBeforeWaitForNewMasterAndRetry.run();
super.waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener);
super.waitForNewMasterAndRetry(actionName, observer, shardEntry, listener);
onAfterWaitForNewMasterAndRetry.run();
}
}
@ -140,7 +139,7 @@ public class ShardStateActionTests extends ESTestCase {
CountDownLatch latch = new CountDownLatch(1);
ShardRouting shardRouting = getRandomShardRouting(index);
shardStateAction.shardFailed(shardRouting, shardRouting, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
shardStateAction.localShardFailed(shardRouting, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
success.set(true);
@ -158,10 +157,11 @@ public class ShardStateActionTests extends ESTestCase {
CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertEquals(1, capturedRequests.length);
// the request is a shard failed request
assertThat(capturedRequests[0].request, is(instanceOf(ShardStateAction.ShardRoutingEntry.class)));
ShardStateAction.ShardRoutingEntry shardRoutingEntry = (ShardStateAction.ShardRoutingEntry) capturedRequests[0].request;
assertThat(capturedRequests[0].request, is(instanceOf(ShardStateAction.ShardEntry.class)));
ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) capturedRequests[0].request;
// for the right shard
assertEquals(shardRouting, shardRoutingEntry.getShardRouting());
assertEquals(shardEntry.shardId, shardRouting.shardId());
assertEquals(shardEntry.allocationId, shardRouting.allocationId().getId());
// sent to the master
assertEquals(clusterService.state().nodes().getMasterNode().getId(), capturedRequests[0].node.getId());
@ -188,7 +188,7 @@ public class ShardStateActionTests extends ESTestCase {
});
ShardRouting failedShard = getRandomShardRouting(index);
shardStateAction.shardFailed(failedShard, failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
success.set(true);
@ -237,7 +237,7 @@ public class ShardStateActionTests extends ESTestCase {
setUpMasterRetryVerification(numberOfRetries, retries, latch, retryLoop);
ShardRouting failedShard = getRandomShardRouting(index);
shardStateAction.shardFailed(failedShard, failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
success.set(true);
@ -273,7 +273,7 @@ public class ShardStateActionTests extends ESTestCase {
AtomicBoolean failure = new AtomicBoolean();
ShardRouting failedShard = getRandomShardRouting(index);
shardStateAction.shardFailed(failedShard, failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
failure.set(false);
@ -305,7 +305,7 @@ public class ShardStateActionTests extends ESTestCase {
ShardRouting failedShard = getRandomShardRouting(index);
RoutingTable routingTable = RoutingTable.builder(clusterService.state().getRoutingTable()).remove(index).build();
setState(clusterService, ClusterState.builder(clusterService.state()).routingTable(routingTable));
shardStateAction.shardFailed(failedShard, failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
success.set(true);
@ -334,13 +334,12 @@ public class ShardStateActionTests extends ESTestCase {
ShardRouting failedShard = getRandomShardRouting(index);
String nodeId = randomFrom(clusterService.state().nodes().getNodes().keys().toArray(String.class));
AtomicReference<Throwable> failure = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
ShardRouting sourceFailedShard = TestShardRouting.newShardRouting(failedShard.shardId(), nodeId, randomBoolean(), randomFrom(ShardRoutingState.values()));
shardStateAction.shardFailed(failedShard, sourceFailedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
long primaryTerm = clusterService.state().metaData().index(index).primaryTerm(failedShard.id());
assertThat(primaryTerm, greaterThanOrEqualTo(1L));
shardStateAction.remoteShardFailed(failedShard, primaryTerm + 1, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
failure.set(null);
@ -355,7 +354,7 @@ public class ShardStateActionTests extends ESTestCase {
});
ShardStateAction.NoLongerPrimaryShardException catastrophicError =
new ShardStateAction.NoLongerPrimaryShardException(failedShard.shardId(), "source shard [" + sourceFailedShard + " is neither the local allocation nor the primary allocation");
new ShardStateAction.NoLongerPrimaryShardException(failedShard.shardId(), "dummy failure");
CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
transport.handleRemoteError(capturedRequests[0].requestId, catastrophicError);

View File

@ -23,7 +23,6 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
@ -70,7 +69,7 @@ public class AllocationIdTests extends ESTestCase {
assertThat(shard.allocationId().getId(), equalTo(allocationId.getId()));
assertThat(shard.allocationId().getRelocationId(), notNullValue());
ShardRouting target = shard.buildTargetRelocatingShard();
ShardRouting target = shard.getTargetRelocatingShard();
assertThat(target.allocationId().getId(), equalTo(shard.allocationId().getRelocationId()));
assertThat(target.allocationId().getRelocationId(), equalTo(shard.allocationId().getId()));

View File

@ -23,7 +23,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.test.ESTestCase;
@ -86,7 +85,7 @@ public class ShardRoutingTests extends ESTestCase {
assertFalse(startedShard1.isRelocationTarget());
ShardRouting sourceShard0a = startedShard0.relocate("node2", -1);
assertFalse(sourceShard0a.isRelocationTarget());
ShardRouting targetShard0a = sourceShard0a.buildTargetRelocatingShard();
ShardRouting targetShard0a = sourceShard0a.getTargetRelocatingShard();
assertTrue(targetShard0a.isRelocationTarget());
ShardRouting sourceShard0b = startedShard0.relocate("node2", -1);
ShardRouting sourceShard1 = startedShard1.relocate("node2", -1);

View File

@ -37,6 +37,8 @@ import org.elasticsearch.test.ESAllocationTestCase;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
@ -218,9 +220,6 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2")));
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(UNASSIGNED));
logger.info("fail the shard again, check that nothing happens");
assertThat(strategy.applyFailedShard(clusterState, shardToFail).changed(), equalTo(false));
}
public void testFirstAllocationFailureSingleNode() {
@ -274,9 +273,6 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
}
logger.info("fail the shard again, see that nothing happens");
assertThat(strategy.applyFailedShard(clusterState, firstShard).changed(), equalTo(false));
}
public void testSingleShardMultipleAllocationFailures() {
@ -317,11 +313,17 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
int shardsToFail = randomIntBetween(1, numberOfReplicas);
ArrayList<FailedRerouteAllocation.FailedShard> failedShards = new ArrayList<>();
RoutingNodes routingNodes = clusterState.getRoutingNodes();
Set<String> failedNodes = new HashSet<>();
Set<ShardRouting> shardRoutingsToFail = new HashSet<>();
for (int i = 0; i < shardsToFail; i++) {
String n = "node" + Integer.toString(randomInt(numberOfReplicas));
logger.info("failing shard on node [{}]", n);
ShardRouting shardToFail = routingNodes.node(n).iterator().next();
failedShards.add(new FailedRerouteAllocation.FailedShard(shardToFail, null, null));
String failedNode = "node" + Integer.toString(randomInt(numberOfReplicas));
logger.info("failing shard on node [{}]", failedNode);
ShardRouting shardToFail = routingNodes.node(failedNode).iterator().next();
if (shardRoutingsToFail.contains(shardToFail) == false) {
failedShards.add(new FailedRerouteAllocation.FailedShard(shardToFail, null, null));
failedNodes.add(failedNode);
shardRoutingsToFail.add(shardToFail);
}
}
routingTable = strategy.applyFailedShards(clusterState, failedShards).routingTable();
@ -329,8 +331,14 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.getRoutingNodes();
for (FailedRerouteAllocation.FailedShard failedShard : failedShards) {
if (!routingNodes.node(failedShard.shard.currentNodeId()).isEmpty()) {
fail("shard " + failedShard + " was re-assigned to it's node");
if (routingNodes.getByAllocationId(failedShard.routingEntry.shardId(), failedShard.routingEntry.allocationId().getId()) != null) {
fail("shard " + failedShard + " was not failed");
}
}
for (String failedNode : failedNodes) {
if (!routingNodes.node(failedNode).isEmpty()) {
fail("shard was re-assigned to failed node " + failedNode);
}
}
}
@ -390,9 +398,6 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
}
logger.info("fail the shard again, see that nothing happens");
assertThat(strategy.applyFailedShard(clusterState, firstShard).changed(), equalTo(false));
}
public void testRebalanceFailure() {
@ -530,10 +535,6 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
clusterState = ClusterState.builder(clusterState).routingTable(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable()).build();
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
// simulate another failure coming in, with the "old" shard routing, verify that nothing changes, and we ignore it
routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail);
assertThat(routingResult.changed(), equalTo(false));
}
public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToElect() {
@ -575,9 +576,5 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail)));
// simulate another failure coming in, with the "old" shard routing, verify that nothing changes, and we ignore it
routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail);
assertThat(routingResult.changed(), equalTo(false));
}
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
@ -48,7 +47,7 @@ public class StartedShardsRoutingTests extends ESAllocationTestCase {
logger.info("--> building initial cluster state");
final IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(3).numberOfReplicas(0)
.numberOfShards(2).numberOfReplicas(0)
.build();
final Index index = indexMetaData.getIndex();
ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
@ -56,69 +55,27 @@ public class StartedShardsRoutingTests extends ESAllocationTestCase {
.metaData(MetaData.builder().put(indexMetaData, false));
final ShardRouting initShard = TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.INITIALIZING);
final ShardRouting startedShard = TestShardRouting.newShardRouting(new ShardId(index, 1), "node2", true, ShardRoutingState.STARTED);
final ShardRouting relocatingShard = TestShardRouting.newShardRouting(new ShardId(index, 2), "node1", "node2", true, ShardRoutingState.RELOCATING);
final ShardRouting relocatingShard = TestShardRouting.newShardRouting(new ShardId(index, 1), "node1", "node2", true, ShardRoutingState.RELOCATING);
stateBuilder.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(index)
.addIndexShard(new IndexShardRoutingTable.Builder(initShard.shardId()).addShard(initShard).build())
.addIndexShard(new IndexShardRoutingTable.Builder(startedShard.shardId()).addShard(startedShard).build())
.addIndexShard(new IndexShardRoutingTable.Builder(relocatingShard.shardId()).addShard(relocatingShard).build())).build());
ClusterState state = stateBuilder.build();
logger.info("--> test starting of shard");
RoutingAllocation.Result result = allocation.applyStartedShards(state, Arrays.asList(
TestShardRouting.newShardRouting(initShard.shardId(), initShard.currentNodeId(), initShard.relocatingNodeId(), initShard.primary(),
ShardRoutingState.INITIALIZING, initShard.allocationId())), false);
RoutingAllocation.Result result = allocation.applyStartedShards(state, Arrays.asList(initShard), false);
assertTrue("failed to start " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
assertTrue(initShard + "isn't started \ncurrent routing table:" + result.routingTable().prettyPrint(),
result.routingTable().index("test").shard(initShard.id()).allShardsStarted());
logger.info("--> testing shard variants that shouldn't match the initializing shard");
result = allocation.applyStartedShards(state, Arrays.asList(
TestShardRouting.newShardRouting(initShard.shardId(), initShard.currentNodeId(), initShard.relocatingNodeId(), initShard.primary(),
ShardRoutingState.INITIALIZING)), false);
assertFalse("wrong allocation id flag shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
result = allocation.applyStartedShards(state, Arrays.asList(
TestShardRouting.newShardRouting(initShard.shardId(), "some_node", initShard.currentNodeId(), initShard.primary(),
ShardRoutingState.INITIALIZING, AllocationId.newTargetRelocation(AllocationId.newRelocation(initShard.allocationId())))), false);
assertFalse("relocating shard from node shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
logger.info("--> testing double starting");
result = allocation.applyStartedShards(state, Arrays.asList(
TestShardRouting.newShardRouting(startedShard.shardId(), startedShard.currentNodeId(), startedShard.relocatingNodeId(), startedShard.primary(),
ShardRoutingState.INITIALIZING, startedShard.allocationId())), false);
assertFalse("duplicate starting of the same shard should be ignored \ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
logger.info("--> testing starting of relocating shards");
final AllocationId targetAllocationId = AllocationId.newTargetRelocation(relocatingShard.allocationId());
result = allocation.applyStartedShards(state, Arrays.asList(
TestShardRouting.newShardRouting(relocatingShard.shardId(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), relocatingShard.primary(),
ShardRoutingState.INITIALIZING, targetAllocationId)), false);
result = allocation.applyStartedShards(state, Arrays.asList(relocatingShard.getTargetRelocatingShard()), false);
assertTrue("failed to start " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
ShardRouting shardRouting = result.routingTable().index("test").shard(relocatingShard.id()).getShards().get(0);
assertThat(shardRouting.state(), equalTo(ShardRoutingState.STARTED));
assertThat(shardRouting.currentNodeId(), equalTo("node2"));
assertThat(shardRouting.relocatingNodeId(), nullValue());
logger.info("--> testing shard variants that shouldn't match the initializing relocating shard");
result = allocation.applyStartedShards(state, Arrays.asList(
TestShardRouting.newShardRouting(relocatingShard.shardId(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), relocatingShard.primary(),
ShardRoutingState.INITIALIZING)));
assertFalse("wrong allocation id shouldn't start shard" + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
result = allocation.applyStartedShards(state, Arrays.asList(
TestShardRouting.newShardRouting(relocatingShard.shardId(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), relocatingShard.primary(),
ShardRoutingState.INITIALIZING, relocatingShard.allocationId())), false);
assertFalse("wrong allocation id shouldn't start shard even if relocatingId==shard.id" + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed());
}
}

View File

@ -265,7 +265,7 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
assertEquals(10L, DiskThresholdDecider.getExpectedShardSize(test_0, allocation, 0));
RoutingNode node = new RoutingNode("node1", new DiscoveryNode("node1", new LocalTransportAddress("test"),
emptyMap(), emptySet(), Version.CURRENT), test_0, test_1.buildTargetRelocatingShard(), test_2);
emptyMap(), emptySet(), Version.CURRENT), test_0, test_1.getTargetRelocatingShard(), test_2);
assertEquals(100L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, false, "/dev/null"));
assertEquals(90L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/null"));
assertEquals(0L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/some/other/dev"));
@ -283,7 +283,7 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
other_0 = ShardRoutingHelper.relocate(other_0, "node1");
node = new RoutingNode("node1", new DiscoveryNode("node1", new LocalTransportAddress("test"),
emptyMap(), emptySet(), Version.CURRENT), test_0, test_1.buildTargetRelocatingShard(), test_2, other_0.buildTargetRelocatingShard());
emptyMap(), emptySet(), Version.CURRENT), test_0, test_1.getTargetRelocatingShard(), test_2, other_0.getTargetRelocatingShard());
if (other_0.primary()) {
assertEquals(10100L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, false, "/dev/null"));
assertEquals(10090L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/null"));

View File

@ -951,7 +951,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
NetworkPartition networkPartition = addRandomIsolation(isolatedNode);
networkPartition.startDisrupting();
service.shardFailed(failedShard, failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new
service.localShardFailed(failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new
ShardStateAction.Listener() {
@Override
public void onSuccess() {

View File

@ -563,7 +563,7 @@ public class NodeJoinControllerTests extends ESTestCase {
}
@Override
public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards,
public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List<ShardRouting> startedShards,
boolean withReroute) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}

View File

@ -477,7 +477,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
}
@Override
public void failShard(ShardRouting replica, ShardRouting primary, String message, Exception exception, Runnable onSuccess,
public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
throw new UnsupportedOperationException();
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.indices.cluster;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
@ -41,6 +42,7 @@ import org.elasticsearch.action.support.master.TransportMasterNodeActionUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.AliasValidator;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -53,12 +55,12 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RandomAllocationDeciderTests;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
@ -78,6 +80,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import static com.carrotsearch.randomizedtesting.RandomizedTest.getRandom;
import static org.elasticsearch.env.Environment.PATH_HOME_SETTING;
@ -91,10 +94,11 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ClusterStateChanges {
public class ClusterStateChanges extends AbstractComponent {
private final ClusterService clusterService;
private final AllocationService allocationService;
private final ShardStateAction.ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor;
private final ShardStateAction.ShardStartedClusterStateTaskExecutor shardStartedClusterStateTaskExecutor;
// transport actions
private final TransportCloseIndexAction transportCloseIndexAction;
@ -105,14 +109,16 @@ public class ClusterStateChanges {
private final TransportCreateIndexAction transportCreateIndexAction;
public ClusterStateChanges() {
Settings settings = Settings.builder().put(PATH_HOME_SETTING.getKey(), "dummy").build();
super(Settings.builder().put(PATH_HOME_SETTING.getKey(), "dummy").build());
allocationService = new AllocationService(settings, new AllocationDeciders(settings,
final AllocationService allocationService = new AllocationService(settings, new AllocationDeciders(settings,
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(settings),
new ReplicaAfterPrimaryActiveAllocationDecider(settings),
new RandomAllocationDeciderTests.RandomAllocationDecider(getRandom())))),
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings),
EmptyClusterInfoService.INSTANCE);
shardFailedClusterStateTaskExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger);
shardStartedClusterStateTaskExecutor = new ShardStateAction.ShardStartedClusterStateTaskExecutor(allocationService, logger);
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ActionFilters actionFilters = new ActionFilters(Collections.emptySet());
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
@ -199,13 +205,26 @@ public class ClusterStateChanges {
}
public ClusterState applyFailedShards(ClusterState clusterState, List<FailedRerouteAllocation.FailedShard> failedShards) {
RoutingAllocation.Result rerouteResult = allocationService.applyFailedShards(clusterState, failedShards);
return ClusterState.builder(clusterState).routingResult(rerouteResult).build();
List<ShardStateAction.ShardEntry> entries = failedShards.stream().map(failedShard ->
new ShardStateAction.ShardEntry(failedShard.routingEntry.shardId(), failedShard.routingEntry.allocationId().getId(),
0L, failedShard.message, failedShard.failure))
.collect(Collectors.toList());
try {
return shardFailedClusterStateTaskExecutor.execute(clusterState, entries).resultingState;
} catch (Exception e) {
throw ExceptionsHelper.convertToRuntime(e);
}
}
public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRouting> startedShards) {
RoutingAllocation.Result rerouteResult = allocationService.applyStartedShards(clusterState, startedShards);
return ClusterState.builder(clusterState).routingResult(rerouteResult).build();
List<ShardStateAction.ShardEntry> entries = startedShards.stream().map(startedShard ->
new ShardStateAction.ShardEntry(startedShard.shardId(), startedShard.allocationId().getId(), 0L, "shard started", null))
.collect(Collectors.toList());
try {
return shardStartedClusterStateTaskExecutor.execute(clusterState, entries).resultingState;
} catch (Exception e) {
throw ExceptionsHelper.convertToRuntime(e);
}
}
private <Request extends MasterNodeRequest<Request>, Response extends ActionResponse> ClusterState execute(