(cherry picked from commit 0e473115958f691fc8dc87293642aea6a07fe3da) Signed-off-by: Andrei Dan <andrei.dan@elastic.co>
This commit is contained in:
parent
4eea9c20ee
commit
da2d441d50
|
@ -5,11 +5,12 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.ilm;
|
||||
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -55,18 +56,23 @@ public class SetSingleNodeAllocateStep extends AsyncActionStep {
|
|||
super(key, nextStepKey, client);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRetryable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void performAction(IndexMetaData indexMetaData, ClusterState clusterState, ClusterStateObserver observer, Listener listener) {
|
||||
final RoutingNodes routingNodes = clusterState.getRoutingNodes();
|
||||
RoutingAllocation allocation = new RoutingAllocation(ALLOCATION_DECIDERS, routingNodes, clusterState, null,
|
||||
System.nanoTime());
|
||||
List<String> validNodeIds = new ArrayList<>();
|
||||
String indexName = indexMetaData.getIndex().getName();
|
||||
final Map<ShardId, List<ShardRouting>> routingsByShardId = clusterState.getRoutingTable()
|
||||
.allShards(indexMetaData.getIndex().getName())
|
||||
.allShards(indexName)
|
||||
.stream()
|
||||
.collect(Collectors.groupingBy(ShardRouting::shardId));
|
||||
|
||||
|
||||
if (routingsByShardId.isEmpty() == false) {
|
||||
for (RoutingNode node : routingNodes) {
|
||||
boolean canAllocateOneCopyOfEachShard = routingsByShardId.values().stream() // For each shard
|
||||
|
@ -80,21 +86,24 @@ public class SetSingleNodeAllocateStep extends AsyncActionStep {
|
|||
// Shuffle the list of nodes so the one we pick is random
|
||||
Randomness.shuffle(validNodeIds);
|
||||
Optional<String> nodeId = validNodeIds.stream().findAny();
|
||||
|
||||
if (nodeId.isPresent()) {
|
||||
Settings settings = Settings.builder()
|
||||
.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", nodeId.get()).build();
|
||||
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexMetaData.getIndex().getName())
|
||||
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName)
|
||||
.masterNodeTimeout(getMasterTimeout(clusterState))
|
||||
.settings(settings);
|
||||
getClient().admin().indices().updateSettings(updateSettingsRequest,
|
||||
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
|
||||
} else {
|
||||
// No nodes currently match the allocation rules so just wait until there is one that does
|
||||
logger.debug("could not find any nodes to allocate index [{}] onto prior to shrink");
|
||||
listener.onResponse(false);
|
||||
// No nodes currently match the allocation rules, so report this as an error and we'll retry
|
||||
logger.debug("could not find any nodes to allocate index [{}] onto prior to shrink", indexName);
|
||||
listener.onFailure(new NoNodeAvailableException("could not find any nodes to allocate index [" + indexName + "] onto" +
|
||||
" prior to shrink"));
|
||||
}
|
||||
} else {
|
||||
// There are no shards for the index, the index might be gone
|
||||
// There are no shards for the index, the index might be gone. Even though this is a retryable step ILM will not retry in
|
||||
// this case as we're using the periodic loop to trigger the retries and that is run over *existing* indices.
|
||||
listener.onFailure(new IndexNotFoundException(indexMetaData.getIndex()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
|
@ -45,6 +46,7 @@ import java.util.stream.Collectors;
|
|||
import static org.hamcrest.Matchers.anyOf;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSingleNodeAllocateStep> {
|
||||
|
@ -563,22 +565,23 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSing
|
|||
|
||||
SetSingleNodeAllocateStep step = createRandomInstance();
|
||||
|
||||
SetOnce<Boolean> actionCompleted = new SetOnce<>();
|
||||
SetOnce<Exception> actionCompleted = new SetOnce<>();
|
||||
|
||||
step.performAction(indexMetaData, clusterState, null, new Listener() {
|
||||
|
||||
@Override
|
||||
public void onResponse(boolean complete) {
|
||||
actionCompleted.set(complete);
|
||||
throw new AssertionError("Unexpected method call");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new AssertionError("Unexpected method call", e);
|
||||
actionCompleted.set(e);
|
||||
}
|
||||
});
|
||||
|
||||
assertEquals(false, actionCompleted.get());
|
||||
Exception failure = actionCompleted.get();
|
||||
assertThat(failure, instanceOf(NoNodeAvailableException.class));
|
||||
|
||||
Mockito.verifyZeroInteractions(client);
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
|
|||
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
|
||||
import org.elasticsearch.xpack.core.ilm.RolloverAction;
|
||||
import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
|
||||
import org.elasticsearch.xpack.core.ilm.SetSingleNodeAllocateStep;
|
||||
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
|
||||
import org.elasticsearch.xpack.core.ilm.ShrinkStep;
|
||||
import org.elasticsearch.xpack.core.ilm.Step;
|
||||
|
@ -591,6 +592,61 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));
|
||||
}
|
||||
|
||||
public void testSetSingleNodeAllocationRetriesUntilItSucceeds() throws Exception {
|
||||
int numShards = 2;
|
||||
int expectedFinalShards = 1;
|
||||
String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + index;
|
||||
createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
|
||||
|
||||
ensureGreen(index);
|
||||
|
||||
// unallocate all index shards
|
||||
Request setAllocationToMissingAttribute = new Request("PUT", "/" + index + "/_settings");
|
||||
setAllocationToMissingAttribute.setJsonEntity("{\n" +
|
||||
" \"settings\": {\n" +
|
||||
" \"index.routing.allocation.include.rack\": \"bogus_rack\"" +
|
||||
" }\n" +
|
||||
"}");
|
||||
client().performRequest(setAllocationToMissingAttribute);
|
||||
|
||||
ensureHealth(index, (request) -> {
|
||||
request.addParameter("wait_for_status", "red");
|
||||
request.addParameter("timeout", "70s");
|
||||
request.addParameter("level", "shards");
|
||||
});
|
||||
|
||||
// assign the policy that'll attempt to shrink the index
|
||||
createNewSingletonPolicy("warm", new ShrinkAction(expectedFinalShards));
|
||||
updatePolicy(index, policy);
|
||||
|
||||
assertTrue("ILM did not start retrying the set-single-node-allocation step", waitUntil(() -> {
|
||||
try {
|
||||
Map<String, Object> explainIndexResponse = explainIndex(index);
|
||||
if (explainIndexResponse == null) {
|
||||
return false;
|
||||
}
|
||||
String failedStep = (String) explainIndexResponse.get("failed_step");
|
||||
Integer retryCount = (Integer) explainIndexResponse.get(FAILED_STEP_RETRY_COUNT_FIELD);
|
||||
return failedStep != null && failedStep.equals(SetSingleNodeAllocateStep.NAME) && retryCount != null && retryCount >= 1;
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}, 30, TimeUnit.SECONDS));
|
||||
|
||||
Request resetAllocationForIndex = new Request("PUT", "/" + index + "/_settings");
|
||||
resetAllocationForIndex.setJsonEntity("{\n" +
|
||||
" \"settings\": {\n" +
|
||||
" \"index.routing.allocation.include.rack\": null" +
|
||||
" }\n" +
|
||||
"}");
|
||||
client().performRequest(resetAllocationForIndex);
|
||||
|
||||
assertBusy(() -> assertTrue(indexExists(shrunkenIndex)), 30, TimeUnit.SECONDS);
|
||||
assertBusy(() -> assertTrue(aliasExists(shrunkenIndex, index)));
|
||||
assertBusy(() -> assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey())));
|
||||
}
|
||||
|
||||
public void testFreezeAction() throws Exception {
|
||||
createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
|
||||
|
|
Loading…
Reference in New Issue