diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java index ac125f7d50c..b70678c1262 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java @@ -5,11 +5,12 @@ */ package org.elasticsearch.xpack.core.ilm; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -55,18 +56,23 @@ public class SetSingleNodeAllocateStep extends AsyncActionStep { super(key, nextStepKey, client); } + @Override + public boolean isRetryable() { + return true; + } + @Override public void performAction(IndexMetaData indexMetaData, ClusterState clusterState, ClusterStateObserver observer, Listener listener) { final RoutingNodes routingNodes = clusterState.getRoutingNodes(); RoutingAllocation allocation = new RoutingAllocation(ALLOCATION_DECIDERS, routingNodes, clusterState, null, System.nanoTime()); List validNodeIds = new ArrayList<>(); + String indexName = indexMetaData.getIndex().getName(); final Map> routingsByShardId = clusterState.getRoutingTable() - .allShards(indexMetaData.getIndex().getName()) + .allShards(indexName) .stream() .collect(Collectors.groupingBy(ShardRouting::shardId)); - if (routingsByShardId.isEmpty() == false) { for (RoutingNode node : routingNodes) { boolean canAllocateOneCopyOfEachShard = routingsByShardId.values().stream() // For each shard @@ -80,21 +86,24 @@ public class SetSingleNodeAllocateStep extends AsyncActionStep { // Shuffle the list of nodes so the one we pick is random Randomness.shuffle(validNodeIds); Optional nodeId = validNodeIds.stream().findAny(); + if (nodeId.isPresent()) { Settings settings = Settings.builder() .put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", nodeId.get()).build(); - UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexMetaData.getIndex().getName()) + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName) .masterNodeTimeout(getMasterTimeout(clusterState)) .settings(settings); getClient().admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure)); } else { - // No nodes currently match the allocation rules so just wait until there is one that does - logger.debug("could not find any nodes to allocate index [{}] onto prior to shrink"); - listener.onResponse(false); + // No nodes currently match the allocation rules, so report this as an error and we'll retry + logger.debug("could not find any nodes to allocate index [{}] onto prior to shrink", indexName); + listener.onFailure(new NoNodeAvailableException("could not find any nodes to allocate index [" + indexName + "] onto" + + " prior to shrink")); } } else { - // There are no shards for the index, the index might be gone + // There are no shards for the index, the index might be gone. Even though this is a retryable step ILM will not retry in + // this case as we're using the periodic loop to trigger the retries and that is run over *existing* indices. listener.onFailure(new IndexNotFoundException(indexMetaData.getIndex())); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStepTests.java index 8d4aa33a9e4..1f1607aa069 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStepTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -45,6 +46,7 @@ import java.util.stream.Collectors; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase { @@ -563,22 +565,23 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase actionCompleted = new SetOnce<>(); + SetOnce actionCompleted = new SetOnce<>(); step.performAction(indexMetaData, clusterState, null, new Listener() { @Override public void onResponse(boolean complete) { - actionCompleted.set(complete); + throw new AssertionError("Unexpected method call"); } @Override public void onFailure(Exception e) { - throw new AssertionError("Unexpected method call", e); + actionCompleted.set(e); } }); - assertEquals(false, actionCompleted.get()); + Exception failure = actionCompleted.get(); + assertThat(failure, instanceOf(NoNodeAvailableException.class)); Mockito.verifyZeroInteractions(client); } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java index f4c007479c8..731481b13ea 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java @@ -39,6 +39,7 @@ import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep; import org.elasticsearch.xpack.core.ilm.ReadOnlyAction; import org.elasticsearch.xpack.core.ilm.RolloverAction; import org.elasticsearch.xpack.core.ilm.SetPriorityAction; +import org.elasticsearch.xpack.core.ilm.SetSingleNodeAllocateStep; import org.elasticsearch.xpack.core.ilm.ShrinkAction; import org.elasticsearch.xpack.core.ilm.ShrinkStep; import org.elasticsearch.xpack.core.ilm.Step; @@ -591,6 +592,61 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase { assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404)); } + public void testSetSingleNodeAllocationRetriesUntilItSucceeds() throws Exception { + int numShards = 2; + int expectedFinalShards = 1; + String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + index; + createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)); + + ensureGreen(index); + + // unallocate all index shards + Request setAllocationToMissingAttribute = new Request("PUT", "/" + index + "/_settings"); + setAllocationToMissingAttribute.setJsonEntity("{\n" + + " \"settings\": {\n" + + " \"index.routing.allocation.include.rack\": \"bogus_rack\"" + + " }\n" + + "}"); + client().performRequest(setAllocationToMissingAttribute); + + ensureHealth(index, (request) -> { + request.addParameter("wait_for_status", "red"); + request.addParameter("timeout", "70s"); + request.addParameter("level", "shards"); + }); + + // assign the policy that'll attempt to shrink the index + createNewSingletonPolicy("warm", new ShrinkAction(expectedFinalShards)); + updatePolicy(index, policy); + + assertTrue("ILM did not start retrying the set-single-node-allocation step", waitUntil(() -> { + try { + Map explainIndexResponse = explainIndex(index); + if (explainIndexResponse == null) { + return false; + } + String failedStep = (String) explainIndexResponse.get("failed_step"); + Integer retryCount = (Integer) explainIndexResponse.get(FAILED_STEP_RETRY_COUNT_FIELD); + return failedStep != null && failedStep.equals(SetSingleNodeAllocateStep.NAME) && retryCount != null && retryCount >= 1; + } catch (IOException e) { + return false; + } + }, 30, TimeUnit.SECONDS)); + + Request resetAllocationForIndex = new Request("PUT", "/" + index + "/_settings"); + resetAllocationForIndex.setJsonEntity("{\n" + + " \"settings\": {\n" + + " \"index.routing.allocation.include.rack\": null" + + " }\n" + + "}"); + client().performRequest(resetAllocationForIndex); + + assertBusy(() -> assertTrue(indexExists(shrunkenIndex)), 30, TimeUnit.SECONDS); + assertBusy(() -> assertTrue(aliasExists(shrunkenIndex, index))); + assertBusy(() -> assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey()))); + } + public void testFreezeAction() throws Exception { createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));