diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java index 061ec41039b..d5cc35b2205 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java @@ -21,7 +21,12 @@ package org.elasticsearch.action.admin.cluster.reroute; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction; +import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest; +import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; @@ -29,14 +34,26 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingExplanations; +import org.elasticsearch.cluster.routing.allocation.command.AbstractAllocateAllocationCommand; +import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; +import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.ImmutableOpenIntMap; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class TransportClusterRerouteAction extends TransportMasterNodeAction { private final AllocationService allocationService; @@ -69,18 +86,71 @@ public class TransportClusterRerouteAction extends TransportMasterNodeAction listener) { - ActionListener logWrapper = ActionListener.wrap( - response -> { - if (request.dryRun() == false) { - response.getExplanations().getYesDecisionMessages().forEach(logger::info); - } - listener.onResponse(response); - }, - listener::onFailure - ); + Map> stalePrimaryAllocations = new HashMap<>(); + for (AllocationCommand command : request.getCommands().commands()) { + if (command instanceof AllocateStalePrimaryAllocationCommand) { + final AllocateStalePrimaryAllocationCommand cmd = (AllocateStalePrimaryAllocationCommand) command; + stalePrimaryAllocations.computeIfAbsent(cmd.index(), k -> new ArrayList<>()).add(cmd); + } + } + if (stalePrimaryAllocations.isEmpty()) { + submitStateUpdate(request, listener); + } else { + verifyThenSubmitUpdate(request, listener, stalePrimaryAllocations); + } + } - clusterService.submitStateUpdateTask("cluster_reroute (api)", new ClusterRerouteResponseAckedClusterStateUpdateTask(logger, - allocationService, request, logWrapper)); + private void verifyThenSubmitUpdate(ClusterRerouteRequest request, ActionListener listener, + Map> stalePrimaryAllocations) { + transportService.sendRequest(transportService.getLocalNode(), IndicesShardStoresAction.NAME, + new IndicesShardStoresRequest().indices(stalePrimaryAllocations.keySet().toArray(Strings.EMPTY_ARRAY)), + new ActionListenerResponseHandler<>( + ActionListener.wrap( + response -> { + ImmutableOpenMap>> status = + response.getStoreStatuses(); + Exception e = null; + for (Map.Entry> entry : stalePrimaryAllocations.entrySet()) { + final String index = entry.getKey(); + final ImmutableOpenIntMap> indexStatus = status.get(index); + assert indexStatus != null; + for (AbstractAllocateAllocationCommand command : entry.getValue()) { + final List shardStatus = + indexStatus.get(command.shardId()); + if (shardStatus == null || shardStatus.isEmpty()) { + e = ExceptionsHelper.useOrSuppress(e, new IllegalArgumentException( + "No data for shard [" + command.shardId() + "] of index [" + index + "] found on any node") + ); + } else if (shardStatus.stream().noneMatch(storeStatus -> { + final DiscoveryNode node = storeStatus.getNode(); + final String nodeInCommand = command.node(); + return nodeInCommand.equals(node.getName()) || nodeInCommand.equals(node.getId()); + })) { + e = ExceptionsHelper.useOrSuppress(e, new IllegalArgumentException( + "No data for shard [" + command.shardId() + "] of index [" + index + "] found on node [" + + command.node() + ']')); + } + } + } + if (e == null) { + submitStateUpdate(request, listener); + } else { + listener.onFailure(e); + } + }, listener::onFailure + ), IndicesShardStoresResponse::new)); + } + + private void submitStateUpdate(final ClusterRerouteRequest request, final ActionListener listener) { + clusterService.submitStateUpdateTask("cluster_reroute (api)", + new ClusterRerouteResponseAckedClusterStateUpdateTask(logger, allocationService, request, + ActionListener.wrap( + response -> { + if (request.dryRun() == false) { + response.getExplanations().getYesDecisionMessages().forEach(logger::info); + } + listener.onResponse(response); + }, listener::onFailure))); } static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClusterStateUpdateTask { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java index d87de21bc48..ed348539d35 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java @@ -283,7 +283,11 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon } IndicesShardStoresResponse() { - this(ImmutableOpenMap.>>of(), Collections.emptyList()); + this(ImmutableOpenMap.of(), Collections.emptyList()); + } + + public IndicesShardStoresResponse(StreamInput in) throws IOException { + readFrom(in); } /** diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index e2777616f42..213d8175fcd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.gateway.GatewayAllocator; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.shard.IndexShard; @@ -50,6 +51,7 @@ import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -175,15 +177,17 @@ public class PrimaryAllocationIT extends ESIntegTestCase { .getShards().get(0).primaryShard().unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NODE_LEFT)); logger.info("--> force allocation of stale copy to node that does not have shard copy"); - client().admin().cluster().prepareReroute().add(new AllocateStalePrimaryAllocationCommand("test", 0, - dataNodeWithNoShardCopy, true)).get(); + Throwable iae = expectThrows( + IllegalArgumentException.class, + () -> client().admin().cluster().prepareReroute().add(new AllocateStalePrimaryAllocationCommand("test", 0, + dataNodeWithNoShardCopy, true)).get()); + assertThat(iae.getMessage(), equalTo("No data for shard [0] of index [test] found on any node")); logger.info("--> wait until shard is failed and becomes unassigned again"); - assertBusy(() -> - assertTrue(client().admin().cluster().prepareState().get().getState().toString(), - client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").allPrimaryShardsUnassigned())); + assertTrue(client().admin().cluster().prepareState().get().getState().toString(), + client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").allPrimaryShardsUnassigned()); assertThat(client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test") - .getShards().get(0).primaryShard().unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED)); + .getShards().get(0).primaryShard().unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NODE_LEFT)); } public void testForceStaleReplicaToBePromotedToPrimary() throws Exception { @@ -261,6 +265,43 @@ public class PrimaryAllocationIT extends ESIntegTestCase { assertThat(newHistoryUUIds, hasSize(1)); } + public void testForceStaleReplicaToBePromotedToPrimaryOnWrongNode() throws Exception { + String master = internalCluster().startMasterOnlyNode(Settings.EMPTY); + internalCluster().startDataOnlyNodes(2); + final String idxName = "test"; + assertAcked(client().admin().indices().prepareCreate(idxName) + .setSettings(Settings.builder().put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1)).get()); + ensureGreen(); + createStaleReplicaScenario(master); + internalCluster().startDataOnlyNodes(2); + final int shardId = 0; + final List nodeNames = new ArrayList<>(Arrays.asList(internalCluster().getNodeNames())); + nodeNames.remove(master); + client().admin().indices().prepareShardStores(idxName).get().getStoreStatuses().get(idxName) + .get(shardId).forEach(status -> nodeNames.remove(status.getNode().getName())); + assertThat(nodeNames, hasSize(1)); + final String nodeWithoutData = nodeNames.get(0); + Throwable iae = expectThrows( + IllegalArgumentException.class, + () -> client().admin().cluster().prepareReroute() + .add(new AllocateStalePrimaryAllocationCommand(idxName, shardId, nodeWithoutData, true)).get()); + assertThat( + iae.getMessage(), + equalTo("No data for shard [" + shardId + "] of index [" + idxName + "] found on node [" + nodeWithoutData + ']')); + } + + public void testForceStaleReplicaToBePromotedForMissingIndex() { + internalCluster().startMasterOnlyNode(Settings.EMPTY); + final String dataNode = internalCluster().startDataOnlyNode(); + final String idxName = "test"; + IndexNotFoundException ex = expectThrows( + IndexNotFoundException.class, + () -> client().admin().cluster().prepareReroute() + .add(new AllocateStalePrimaryAllocationCommand(idxName, 0, dataNode, true)).get()); + assertThat(ex.getIndex().getName(), equalTo(idxName)); + } + public void testForcePrimaryShardIfAllocationDecidersSayNoAfterIndexCreation() throws ExecutionException, InterruptedException { String node = internalCluster().startNode(); client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(Settings.builder()