Adds single node allocation to shrink (#30206)
* Adds ClusterState to AsyncActionStep.performAction This is needed so a new step can be created for the shrink action which can select a node to allocate to based on the current routing rules and the node attributes on teh discovery nodes. x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/AsyncActionStep.java x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/DeleteStep.java x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/ForceMergeStep.java x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/RolloverStep.java x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/ShrinkSetAliasStep.java x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/ShrinkStep.java x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/UpdateSettingsStep.java x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifec ycle/DeleteStepTests.java x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifec ycle/ForceMergeStepTests.java x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifec ycle/RolloverStepTests.java x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifec ycle/ShrinkSetAliasStepTests.java x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifec ycle/ShrinkStepTests.java x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifec ycle/UpdateSettingsStepTests.java x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/IndexLifecycleRunner.java x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/IndexLifecycleService.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/IndexLifecycleRunnerTests.java * Adds single node allocation to shrink This change adds two new steps as the first steps of the shrink action: 1. A `SetSingleNodeAllocateStep` which: 1. Determines which of the active nodes match the existing index allocation rules 2. Randomly (using Randomness so its deterministic for testing) picks one of the matching nodes 3. Updates the index settings to add a require allocation rule for the node that was picked (using the `index.routing.allocation.require._name` setting) 2. An `AllocationRoutedStep` which ensures that at least one copy of each shard is allocated according to the new allocation rules Note that this change also modifies the `AllocationRoutedStep` to add a boolean field which determines whether the allocation is complete when at least one copy of each shard matches the allocation rulees or if it needs to wait for all shard copies to be allocated according to the rules. Lastly, a `randomStepKey()` method is added to `AbstractStepTestCase` for convenience. x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/AllocateAction.java x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/AllocationRoutedStep.java x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/SetSingleNodeAllocateStep.java x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/ShrinkAction.java x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/ShrinkSetAliasStep.java x-pack/plugin/core/src/test/java/org/elasticsearch/action/admin/indices/ settings/put/UpdateSettingsTestHelper.java x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifec ycle/AbstractStepTestCase.java x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifec ycle/AllocationRoutedStepTests.java x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifec ycle/SetSingleNodeAllocateStepTests.java x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifec ycle/ShrinkActionTests.java * Fixes AllocationRoutedStep when `waitForAllShardCopies=false` This change fixes `AllocationRoutedStep` so that when `waitForAllShardCopies=false` we wait for any shard copy of each shard to be allocated according to the allocation rules rather than specifically the primary of each shard. x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/AllocationRoutedStep.java x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifec ycle/AllocationRoutedStepTests.java * Corrects Licence headers and typo x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/SetSingleNodeAllocateStep.java x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifec ycle/SetSingleNodeAllocateStepTests.java
This commit is contained in:
parent
a6b0f663af
commit
42711dd46d
|
@ -123,7 +123,7 @@ public class AllocateAction implements LifecycleAction {
|
||||||
exclude.forEach((key, value) -> newSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + key, value));
|
exclude.forEach((key, value) -> newSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + key, value));
|
||||||
require.forEach((key, value) -> newSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + key, value));
|
require.forEach((key, value) -> newSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + key, value));
|
||||||
UpdateSettingsStep allocateStep = new UpdateSettingsStep(allocateKey, allocationRoutedKey, client, newSettings.build());
|
UpdateSettingsStep allocateStep = new UpdateSettingsStep(allocateKey, allocationRoutedKey, client, newSettings.build());
|
||||||
AllocationRoutedStep routedCheckStep = new AllocationRoutedStep(allocationRoutedKey, nextStepKey);
|
AllocationRoutedStep routedCheckStep = new AllocationRoutedStep(allocationRoutedKey, nextStepKey, true);
|
||||||
return Arrays.asList(allocateStep, routedCheckStep);
|
return Arrays.asList(allocateStep, routedCheckStep);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,15 +5,19 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||||
|
|
||||||
|
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||||
|
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.action.support.ActiveShardCount;
|
import org.elasticsearch.action.support.ActiveShardCount;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
|
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
|
||||||
|
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
|
||||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||||
import org.elasticsearch.common.settings.ClusterSettings;
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -21,7 +25,7 @@ import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.IndexNotFoundException;
|
import org.elasticsearch.index.IndexNotFoundException;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.Objects;
|
||||||
|
|
||||||
public class AllocationRoutedStep extends ClusterStateWaitStep {
|
public class AllocationRoutedStep extends ClusterStateWaitStep {
|
||||||
public static final String NAME = "check-allocation";
|
public static final String NAME = "check-allocation";
|
||||||
|
@ -31,8 +35,15 @@ public class AllocationRoutedStep extends ClusterStateWaitStep {
|
||||||
private static final AllocationDeciders ALLOCATION_DECIDERS = new AllocationDeciders(Settings.EMPTY, Collections.singletonList(
|
private static final AllocationDeciders ALLOCATION_DECIDERS = new AllocationDeciders(Settings.EMPTY, Collections.singletonList(
|
||||||
new FilterAllocationDecider(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))));
|
new FilterAllocationDecider(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))));
|
||||||
|
|
||||||
AllocationRoutedStep(StepKey key, StepKey nextStepKey) {
|
private boolean waitOnAllShardCopies;
|
||||||
|
|
||||||
|
AllocationRoutedStep(StepKey key, StepKey nextStepKey, boolean waitOnAllShardCopies) {
|
||||||
super(key, nextStepKey);
|
super(key, nextStepKey);
|
||||||
|
this.waitOnAllShardCopies = waitOnAllShardCopies;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getWaitOnAllShardCopies() {
|
||||||
|
return waitOnAllShardCopies;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -51,24 +62,54 @@ public class AllocationRoutedStep extends ClusterStateWaitStep {
|
||||||
// if the allocation has happened
|
// if the allocation has happened
|
||||||
RoutingAllocation allocation = new RoutingAllocation(ALLOCATION_DECIDERS, clusterState.getRoutingNodes(), clusterState, null,
|
RoutingAllocation allocation = new RoutingAllocation(ALLOCATION_DECIDERS, clusterState.getRoutingNodes(), clusterState, null,
|
||||||
System.nanoTime());
|
System.nanoTime());
|
||||||
int allocationPendingShards = 0;
|
int allocationPendingAllShards = 0;
|
||||||
List<ShardRouting> allShards = clusterState.getRoutingTable().allShards(index.getName());
|
|
||||||
for (ShardRouting shardRouting : allShards) {
|
ImmutableOpenIntMap<IndexShardRoutingTable> allShards = clusterState.getRoutingTable().index(index).getShards();
|
||||||
|
for (ObjectCursor<IndexShardRoutingTable> shardRoutingTable : allShards.values()) {
|
||||||
|
int allocationPendingThisShard = 0;
|
||||||
|
int shardCopiesThisShard = shardRoutingTable.value.size();
|
||||||
|
for (ShardRouting shardRouting : shardRoutingTable.value.shards()) {
|
||||||
String currentNodeId = shardRouting.currentNodeId();
|
String currentNodeId = shardRouting.currentNodeId();
|
||||||
boolean canRemainOnCurrentNode = ALLOCATION_DECIDERS
|
boolean canRemainOnCurrentNode = ALLOCATION_DECIDERS
|
||||||
.canRemain(shardRouting, clusterState.getRoutingNodes().node(currentNodeId), allocation).type() == Decision.Type.YES;
|
.canRemain(shardRouting, clusterState.getRoutingNodes().node(currentNodeId), allocation)
|
||||||
|
.type() == Decision.Type.YES;
|
||||||
if (canRemainOnCurrentNode == false) {
|
if (canRemainOnCurrentNode == false) {
|
||||||
allocationPendingShards++;
|
allocationPendingThisShard++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (allocationPendingShards > 0) {
|
|
||||||
|
if (waitOnAllShardCopies) {
|
||||||
|
allocationPendingAllShards += allocationPendingThisShard;
|
||||||
|
} else if (shardCopiesThisShard - allocationPendingThisShard == 0) {
|
||||||
|
allocationPendingAllShards++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (allocationPendingAllShards > 0) {
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"[{}] lifecycle action for index [{}] waiting for [{}] shards " + "to be allocated to nodes matching the given filters",
|
"[{}] lifecycle action for index [{}] waiting for [{}] shards " + "to be allocated to nodes matching the given filters",
|
||||||
getKey().getAction(), index, allocationPendingShards);
|
getKey().getAction(), index, allocationPendingAllShards);
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
logger.debug("[{}] lifecycle action for index [{}] complete", getKey().getAction(), index);
|
logger.debug("[{}] lifecycle action for index [{}] complete", getKey().getAction(), index);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(super.hashCode(), waitOnAllShardCopies);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (obj == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (getClass() != obj.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
AllocationRoutedStep other = (AllocationRoutedStep) obj;
|
||||||
|
return super.equals(obj) &&
|
||||||
|
Objects.equals(waitOnAllShardCopies, other.waitOnAllShardCopies);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||||
|
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
|
||||||
public abstract class AsyncActionStep extends Step {
|
public abstract class AsyncActionStep extends Step {
|
||||||
|
@ -25,7 +26,7 @@ public abstract class AsyncActionStep extends Step {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract void performAction(IndexMetaData indexMetaData, Listener listener);
|
public abstract void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, Listener listener);
|
||||||
|
|
||||||
public interface Listener {
|
public interface Listener {
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.indexlifecycle;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
|
||||||
public class DeleteStep extends AsyncActionStep {
|
public class DeleteStep extends AsyncActionStep {
|
||||||
|
@ -18,7 +19,7 @@ public class DeleteStep extends AsyncActionStep {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void performAction(IndexMetaData indexMetaData, Listener listener) {
|
public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
|
||||||
getClient().admin().indices()
|
getClient().admin().indices()
|
||||||
.delete(new DeleteIndexRequest(indexMetaData.getIndex().getName()),
|
.delete(new DeleteIndexRequest(indexMetaData.getIndex().getName()),
|
||||||
ActionListener.wrap(response -> listener.onResponse(true) , listener::onFailure));
|
ActionListener.wrap(response -> listener.onResponse(true) , listener::onFailure));
|
||||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.indexlifecycle;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
|
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
@ -26,7 +27,7 @@ public class ForceMergeStep extends AsyncActionStep {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void performAction(IndexMetaData indexMetaData, Listener listener) {
|
public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
|
||||||
ForceMergeRequest request = new ForceMergeRequest(indexMetaData.getIndex().getName());
|
ForceMergeRequest request = new ForceMergeRequest(indexMetaData.getIndex().getName());
|
||||||
request.maxNumSegments(maxNumSegments);
|
request.maxNumSegments(maxNumSegments);
|
||||||
getClient().admin().indices()
|
getClient().admin().indices()
|
||||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.indexlifecycle;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
|
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
@ -32,7 +33,7 @@ public class RolloverStep extends AsyncActionStep {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void performAction(IndexMetaData indexMetaData, Listener listener) {
|
public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
|
||||||
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetaData.getSettings());
|
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetaData.getSettings());
|
||||||
|
|
||||||
if (Strings.isNullOrEmpty(rolloverAlias)) {
|
if (Strings.isNullOrEmpty(rolloverAlias)) {
|
||||||
|
|
|
@ -0,0 +1,76 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||||
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
|
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||||
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
|
||||||
|
import org.elasticsearch.common.Randomness;
|
||||||
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.index.IndexNotFoundException;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
public class SetSingleNodeAllocateStep extends AsyncActionStep {
|
||||||
|
public static final String NAME = "set-single-node-allocation";
|
||||||
|
|
||||||
|
private static final AllocationDeciders ALLOCATION_DECIDERS = new AllocationDeciders(Settings.EMPTY, Collections.singletonList(
|
||||||
|
new FilterAllocationDecider(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))));
|
||||||
|
|
||||||
|
public SetSingleNodeAllocateStep(StepKey key, StepKey nextStepKey, Client client) {
|
||||||
|
super(key, nextStepKey, client);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void performAction(IndexMetaData indexMetaData, ClusterState clusterState, Listener listener) {
|
||||||
|
RoutingAllocation allocation = new RoutingAllocation(ALLOCATION_DECIDERS, clusterState.getRoutingNodes(), clusterState, null,
|
||||||
|
System.nanoTime());
|
||||||
|
List<String> validNodeNames = new ArrayList<>();
|
||||||
|
Optional<ShardRouting> anyShard = clusterState.getRoutingTable().allShards(indexMetaData.getIndex().getName()).stream().findAny();
|
||||||
|
if (anyShard.isPresent()) {
|
||||||
|
// Iterate through the nodes finding ones that are acceptable for the current allocation rules of the shard
|
||||||
|
for (RoutingNode node : clusterState.getRoutingNodes()) {
|
||||||
|
boolean canRemainOnCurrentNode = ALLOCATION_DECIDERS.canRemain(anyShard.get(), node, allocation)
|
||||||
|
.type() == Decision.Type.YES;
|
||||||
|
if (canRemainOnCurrentNode) {
|
||||||
|
DiscoveryNode discoveryNode = node.node();
|
||||||
|
validNodeNames.add(discoveryNode.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Shuffle the list of nodes so the one we pick is random
|
||||||
|
Randomness.shuffle(validNodeNames);
|
||||||
|
Optional<String> nodeName = validNodeNames.stream().findAny();
|
||||||
|
if (nodeName.isPresent()) {
|
||||||
|
Settings settings = Settings.builder()
|
||||||
|
.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_name", nodeName.get()).build();
|
||||||
|
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexMetaData.getIndex().getName())
|
||||||
|
.settings(settings);
|
||||||
|
getClient().admin().indices().updateSettings(updateSettingsRequest,
|
||||||
|
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
|
||||||
|
} else {
|
||||||
|
// No nodes currently match the allocation rules so just wait until there is one that does
|
||||||
|
listener.onResponse(false);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// There are no shards for the index, the index might be gone
|
||||||
|
listener.onFailure(new IndexNotFoundException(indexMetaData.getIndex()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -77,16 +77,20 @@ public class ShrinkAction implements LifecycleAction {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey) {
|
public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey) {
|
||||||
|
StepKey setSingleNodeKey = new StepKey(phase, NAME, SetSingleNodeAllocateStep.NAME);
|
||||||
|
StepKey allocationRoutedKey = new StepKey(phase, NAME, AllocationRoutedStep.NAME);
|
||||||
StepKey shrinkKey = new StepKey(phase, NAME, ShrinkStep.NAME);
|
StepKey shrinkKey = new StepKey(phase, NAME, ShrinkStep.NAME);
|
||||||
StepKey enoughShardsKey = new StepKey(phase, NAME, ShrunkShardsAllocatedStep.NAME);
|
StepKey enoughShardsKey = new StepKey(phase, NAME, ShrunkShardsAllocatedStep.NAME);
|
||||||
StepKey aliasKey = new StepKey(phase, NAME, ShrinkSetAliasStep.NAME);
|
StepKey aliasKey = new StepKey(phase, NAME, ShrinkSetAliasStep.NAME);
|
||||||
StepKey isShrunkIndexKey = new StepKey(phase, NAME, ShrunkenIndexCheckStep.NAME);
|
StepKey isShrunkIndexKey = new StepKey(phase, NAME, ShrunkenIndexCheckStep.NAME);
|
||||||
|
SetSingleNodeAllocateStep setSingleNodeStep = new SetSingleNodeAllocateStep(setSingleNodeKey, allocationRoutedKey, client);
|
||||||
|
AllocationRoutedStep allocationStep = new AllocationRoutedStep(allocationRoutedKey, shrinkKey, false);
|
||||||
ShrinkStep shrink = new ShrinkStep(shrinkKey, enoughShardsKey, client, numberOfShards, SHRUNKEN_INDEX_PREFIX);
|
ShrinkStep shrink = new ShrinkStep(shrinkKey, enoughShardsKey, client, numberOfShards, SHRUNKEN_INDEX_PREFIX);
|
||||||
ShrunkShardsAllocatedStep allocated = new ShrunkShardsAllocatedStep(enoughShardsKey, aliasKey, numberOfShards,
|
ShrunkShardsAllocatedStep allocated = new ShrunkShardsAllocatedStep(enoughShardsKey, aliasKey, numberOfShards,
|
||||||
SHRUNKEN_INDEX_PREFIX);
|
SHRUNKEN_INDEX_PREFIX);
|
||||||
ShrinkSetAliasStep aliasSwapAndDelete = new ShrinkSetAliasStep(aliasKey, isShrunkIndexKey, client, SHRUNKEN_INDEX_PREFIX);
|
ShrinkSetAliasStep aliasSwapAndDelete = new ShrinkSetAliasStep(aliasKey, isShrunkIndexKey, client, SHRUNKEN_INDEX_PREFIX);
|
||||||
ShrunkenIndexCheckStep waitOnShrinkTakeover = new ShrunkenIndexCheckStep(isShrunkIndexKey, nextStepKey, SHRUNKEN_INDEX_PREFIX);
|
ShrunkenIndexCheckStep waitOnShrinkTakeover = new ShrunkenIndexCheckStep(isShrunkIndexKey, nextStepKey, SHRUNKEN_INDEX_PREFIX);
|
||||||
return Arrays.asList(shrink, allocated, aliasSwapAndDelete, waitOnShrinkTakeover);
|
return Arrays.asList(setSingleNodeStep, allocationStep, shrink, allocated, aliasSwapAndDelete, waitOnShrinkTakeover);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.indexlifecycle;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
@ -26,7 +27,7 @@ public class ShrinkSetAliasStep extends AsyncActionStep {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void performAction(IndexMetaData indexMetaData, Listener listener) {
|
public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
|
||||||
// get source index
|
// get source index
|
||||||
String index = indexMetaData.getIndex().getName();
|
String index = indexMetaData.getIndex().getName();
|
||||||
// get target shrink index
|
// get target shrink index
|
||||||
|
@ -40,6 +41,11 @@ public class ShrinkSetAliasStep extends AsyncActionStep {
|
||||||
listener.onResponse(true), listener::onFailure));
|
listener.onResponse(true), listener::onFailure));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean indexSurvives() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(super.hashCode(), shrunkIndexPrefix);
|
return Objects.hash(super.hashCode(), shrunkIndexPrefix);
|
||||||
|
|
|
@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.admin.indices.alias.Alias;
|
import org.elasticsearch.action.admin.indices.alias.Alias;
|
||||||
import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
|
import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
|
||||||
|
@ -35,7 +36,7 @@ public class ShrinkStep extends AsyncActionStep {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void performAction(IndexMetaData indexMetaData, Listener listener) {
|
public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
|
||||||
// if operating on the shrunken index, do nothing
|
// if operating on the shrunken index, do nothing
|
||||||
|
|
||||||
Long lifecycleDate = LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE_SETTING.get(indexMetaData.getSettings());
|
Long lifecycleDate = LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE_SETTING.get(indexMetaData.getSettings());
|
||||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.indexlifecycle;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
|
||||||
|
@ -24,7 +25,7 @@ public class UpdateSettingsStep extends AsyncActionStep {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void performAction(IndexMetaData indexMetaData, Listener listener) {
|
public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
|
||||||
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexMetaData.getIndex().getName()).settings(settings);
|
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexMetaData.getIndex().getName()).settings(settings);
|
||||||
getClient().admin().indices().updateSettings(updateSettingsRequest,
|
getClient().admin().indices().updateSettings(updateSettingsRequest,
|
||||||
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
|
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
|
||||||
|
|
|
@ -8,9 +8,15 @@ package org.elasticsearch.action.admin.indices.settings.put;
|
||||||
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.anyOf;
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
|
||||||
public final class UpdateSettingsTestHelper {
|
public final class UpdateSettingsTestHelper {
|
||||||
|
|
||||||
|
@ -23,6 +29,16 @@ public final class UpdateSettingsTestHelper {
|
||||||
assertEquals(expectedSettings, request.settings());
|
assertEquals(expectedSettings, request.settings());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void assertSettingsRequestContainsValueFrom(UpdateSettingsRequest request, String settingsKey,
|
||||||
|
Set<String> acceptableValues, boolean assertOnlyKeyInSettings, String... expectedIndices) {
|
||||||
|
assertNotNull(request);
|
||||||
|
assertArrayEquals(expectedIndices, request.indices());
|
||||||
|
assertThat(request.settings().get(settingsKey), anyOf(acceptableValues.stream().map(e -> equalTo(e)).collect(Collectors.toList())));
|
||||||
|
if (assertOnlyKeyInSettings) {
|
||||||
|
assertEquals(1, request.settings().size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NORELEASE this isn't nice but it's currently the only way to create an
|
// NORELEASE this isn't nice but it's currently the only way to create an
|
||||||
// UpdateSettingsResponse. Need to see if we can make the constructor public
|
// UpdateSettingsResponse. Need to see if we can make the constructor public
|
||||||
// in ES
|
// in ES
|
||||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.indexlifecycle;
|
||||||
|
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.test.EqualsHashCodeTestUtils;
|
import org.elasticsearch.test.EqualsHashCodeTestUtils;
|
||||||
|
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||||
|
|
||||||
public abstract class AbstractStepTestCase<T extends Step> extends ESTestCase {
|
public abstract class AbstractStepTestCase<T extends Step> extends ESTestCase {
|
||||||
|
|
||||||
|
@ -21,4 +22,8 @@ public abstract class AbstractStepTestCase<T extends Step> extends ESTestCase {
|
||||||
EqualsHashCodeTestUtils.checkEqualsAndHashCode(createRandomInstance(), this::copyInstance, this::mutateInstance);
|
EqualsHashCodeTestUtils.checkEqualsAndHashCode(createRandomInstance(), this::copyInstance, this::mutateInstance);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static StepKey randomStepKey() {
|
||||||
|
return new StepKey(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,34 +33,39 @@ public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRo
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AllocationRoutedStep createRandomInstance() {
|
public AllocationRoutedStep createRandomInstance() {
|
||||||
StepKey stepKey = new StepKey(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10));
|
StepKey stepKey = randomStepKey();
|
||||||
StepKey nextStepKey = new StepKey(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10));
|
StepKey nextStepKey = randomStepKey();
|
||||||
|
boolean waitOnAllShardCopies = randomBoolean();
|
||||||
|
|
||||||
return new AllocationRoutedStep(stepKey, nextStepKey);
|
return new AllocationRoutedStep(stepKey, nextStepKey, waitOnAllShardCopies);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AllocationRoutedStep mutateInstance(AllocationRoutedStep instance) {
|
public AllocationRoutedStep mutateInstance(AllocationRoutedStep instance) {
|
||||||
StepKey key = instance.getKey();
|
StepKey key = instance.getKey();
|
||||||
StepKey nextKey = instance.getNextStepKey();
|
StepKey nextKey = instance.getNextStepKey();
|
||||||
|
boolean waitOnAllShardCopies = instance.getWaitOnAllShardCopies();
|
||||||
|
|
||||||
switch (between(0, 1)) {
|
switch (between(0, 2)) {
|
||||||
case 0:
|
case 0:
|
||||||
key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||||
break;
|
break;
|
||||||
case 1:
|
case 1:
|
||||||
nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||||
break;
|
break;
|
||||||
|
case 2:
|
||||||
|
waitOnAllShardCopies = waitOnAllShardCopies == false;
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
throw new AssertionError("Illegal randomisation branch");
|
throw new AssertionError("Illegal randomisation branch");
|
||||||
}
|
}
|
||||||
|
|
||||||
return new AllocationRoutedStep(key, nextKey);
|
return new AllocationRoutedStep(key, nextKey, waitOnAllShardCopies);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AllocationRoutedStep copyInstance(AllocationRoutedStep instance) {
|
public AllocationRoutedStep copyInstance(AllocationRoutedStep instance) {
|
||||||
return new AllocationRoutedStep(instance.getKey(), instance.getNextStepKey());
|
return new AllocationRoutedStep(instance.getKey(), instance.getNextStepKey(), instance.getWaitOnAllShardCopies());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testConditionMet() {
|
public void testConditionMet() {
|
||||||
|
@ -95,6 +100,40 @@ public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRo
|
||||||
assertAllocateStatus(index, 1, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable, true);
|
assertAllocateStatus(index, 1, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testConditionMetOnlyOneCopyAllocated() {
|
||||||
|
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
|
||||||
|
Map<String, String> includes = AllocateActionTests.randomMap(1, 5);
|
||||||
|
Map<String, String> excludes = AllocateActionTests.randomMap(1, 5);
|
||||||
|
Map<String, String> requires = AllocateActionTests.randomMap(1, 5);
|
||||||
|
Settings.Builder existingSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT.id)
|
||||||
|
.put(IndexMetaData.SETTING_INDEX_UUID, index.getUUID());
|
||||||
|
Settings.Builder expectedSettings = Settings.builder();
|
||||||
|
Settings.Builder node1Settings = Settings.builder();
|
||||||
|
Settings.Builder node2Settings = Settings.builder();
|
||||||
|
includes.forEach((k, v) -> {
|
||||||
|
existingSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + k, v);
|
||||||
|
expectedSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + k, v);
|
||||||
|
node1Settings.put(Node.NODE_ATTRIBUTES.getKey() + k, v);
|
||||||
|
});
|
||||||
|
excludes.forEach((k, v) -> {
|
||||||
|
existingSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + k, v);
|
||||||
|
expectedSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + k, v);
|
||||||
|
});
|
||||||
|
requires.forEach((k, v) -> {
|
||||||
|
existingSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v);
|
||||||
|
expectedSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v);
|
||||||
|
node1Settings.put(Node.NODE_ATTRIBUTES.getKey() + k, v);
|
||||||
|
});
|
||||||
|
boolean primaryOnNode1 = randomBoolean();
|
||||||
|
IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index)
|
||||||
|
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", primaryOnNode1, ShardRoutingState.STARTED))
|
||||||
|
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node2", primaryOnNode1 == false,
|
||||||
|
ShardRoutingState.STARTED));
|
||||||
|
|
||||||
|
AllocationRoutedStep step = new AllocationRoutedStep(randomStepKey(), randomStepKey(), false);
|
||||||
|
assertAllocateStatus(index, 1, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable, true);
|
||||||
|
}
|
||||||
|
|
||||||
public void testExecuteAllocateNotComplete() throws Exception {
|
public void testExecuteAllocateNotComplete() throws Exception {
|
||||||
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
|
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
|
||||||
Map<String, String> includes = AllocateActionTests.randomMap(1, 5);
|
Map<String, String> includes = AllocateActionTests.randomMap(1, 5);
|
||||||
|
@ -128,6 +167,41 @@ public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRo
|
||||||
assertAllocateStatus(index, 2, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable, false);
|
assertAllocateStatus(index, 2, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testExecuteAllocateNotCompleteOnlyOneCopyAllocated() throws Exception {
|
||||||
|
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
|
||||||
|
Map<String, String> includes = AllocateActionTests.randomMap(1, 5);
|
||||||
|
Map<String, String> excludes = AllocateActionTests.randomMap(1, 5);
|
||||||
|
Map<String, String> requires = AllocateActionTests.randomMap(1, 5);
|
||||||
|
Settings.Builder existingSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT.id)
|
||||||
|
.put(IndexMetaData.SETTING_INDEX_UUID, index.getUUID());
|
||||||
|
Settings.Builder expectedSettings = Settings.builder();
|
||||||
|
Settings.Builder node1Settings = Settings.builder();
|
||||||
|
Settings.Builder node2Settings = Settings.builder();
|
||||||
|
includes.forEach((k, v) -> {
|
||||||
|
existingSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + k, v);
|
||||||
|
expectedSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + k, v);
|
||||||
|
node1Settings.put(Node.NODE_ATTRIBUTES.getKey() + k, v);
|
||||||
|
});
|
||||||
|
excludes.forEach((k, v) -> {
|
||||||
|
existingSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + k, v);
|
||||||
|
expectedSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + k, v);
|
||||||
|
});
|
||||||
|
requires.forEach((k, v) -> {
|
||||||
|
existingSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v);
|
||||||
|
expectedSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v);
|
||||||
|
node1Settings.put(Node.NODE_ATTRIBUTES.getKey() + k, v);
|
||||||
|
});
|
||||||
|
|
||||||
|
boolean primaryOnNode1 = randomBoolean();
|
||||||
|
IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index)
|
||||||
|
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", primaryOnNode1, ShardRoutingState.STARTED))
|
||||||
|
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node2", primaryOnNode1 == false,
|
||||||
|
ShardRoutingState.STARTED));
|
||||||
|
|
||||||
|
AllocationRoutedStep step = new AllocationRoutedStep(randomStepKey(), randomStepKey(), true);
|
||||||
|
assertAllocateStatus(index, 2, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable, false);
|
||||||
|
}
|
||||||
|
|
||||||
public void testExecuteAllocateUnassigned() throws Exception {
|
public void testExecuteAllocateUnassigned() throws Exception {
|
||||||
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
|
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
|
||||||
Map<String, String> includes = AllocateActionTests.randomMap(1, 5);
|
Map<String, String> includes = AllocateActionTests.randomMap(1, 5);
|
||||||
|
|
|
@ -91,7 +91,7 @@ public class DeleteStepTests extends AbstractStepTestCase<DeleteStep> {
|
||||||
SetOnce<Boolean> actionCompleted = new SetOnce<>();
|
SetOnce<Boolean> actionCompleted = new SetOnce<>();
|
||||||
|
|
||||||
DeleteStep step = createRandomInstance();
|
DeleteStep step = createRandomInstance();
|
||||||
step.performAction(indexMetaData, new AsyncActionStep.Listener() {
|
step.performAction(indexMetaData, null, new AsyncActionStep.Listener() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(boolean complete) {
|
public void onResponse(boolean complete) {
|
||||||
actionCompleted.set(complete);
|
actionCompleted.set(complete);
|
||||||
|
@ -138,7 +138,7 @@ public class DeleteStepTests extends AbstractStepTestCase<DeleteStep> {
|
||||||
|
|
||||||
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
|
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
|
||||||
DeleteStep step = createRandomInstance();
|
DeleteStep step = createRandomInstance();
|
||||||
step.performAction(indexMetaData, new AsyncActionStep.Listener() {
|
step.performAction(indexMetaData, null, new AsyncActionStep.Listener() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(boolean complete) {
|
public void onResponse(boolean complete) {
|
||||||
throw new AssertionError("Unexpected method call");
|
throw new AssertionError("Unexpected method call");
|
||||||
|
|
|
@ -88,7 +88,7 @@ public class ForceMergeStepTests extends AbstractStepTestCase<ForceMergeStep> {
|
||||||
|
|
||||||
ForceMergeStep step = new ForceMergeStep(stepKey, nextStepKey, client, maxNumSegments);
|
ForceMergeStep step = new ForceMergeStep(stepKey, nextStepKey, client, maxNumSegments);
|
||||||
SetOnce<Boolean> completed = new SetOnce<>();
|
SetOnce<Boolean> completed = new SetOnce<>();
|
||||||
step.performAction(indexMetaData, new AsyncActionStep.Listener() {
|
step.performAction(indexMetaData, null, new AsyncActionStep.Listener() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(boolean complete) {
|
public void onResponse(boolean complete) {
|
||||||
completed.set(complete);
|
completed.set(complete);
|
||||||
|
@ -129,7 +129,7 @@ public class ForceMergeStepTests extends AbstractStepTestCase<ForceMergeStep> {
|
||||||
|
|
||||||
ForceMergeStep step = new ForceMergeStep(stepKey, nextStepKey, client, maxNumSegments);
|
ForceMergeStep step = new ForceMergeStep(stepKey, nextStepKey, client, maxNumSegments);
|
||||||
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
|
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
|
||||||
step.performAction(indexMetaData, new AsyncActionStep.Listener() {
|
step.performAction(indexMetaData, null, new AsyncActionStep.Listener() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(boolean complete) {
|
public void onResponse(boolean complete) {
|
||||||
throw new AssertionError("unexpected method call");
|
throw new AssertionError("unexpected method call");
|
||||||
|
|
|
@ -136,7 +136,7 @@ public class RolloverStepTests extends AbstractStepTestCase<RolloverStep> {
|
||||||
}).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
|
}).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
|
||||||
|
|
||||||
SetOnce<Boolean> actionCompleted = new SetOnce<>();
|
SetOnce<Boolean> actionCompleted = new SetOnce<>();
|
||||||
step.performAction(indexMetaData, new Listener() {
|
step.performAction(indexMetaData, null, new Listener() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(boolean complete) {
|
public void onResponse(boolean complete) {
|
||||||
|
@ -193,7 +193,7 @@ public class RolloverStepTests extends AbstractStepTestCase<RolloverStep> {
|
||||||
}).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
|
}).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
|
||||||
|
|
||||||
SetOnce<Boolean> actionCompleted = new SetOnce<>();
|
SetOnce<Boolean> actionCompleted = new SetOnce<>();
|
||||||
step.performAction(indexMetaData, new Listener() {
|
step.performAction(indexMetaData, null, new Listener() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(boolean complete) {
|
public void onResponse(boolean complete) {
|
||||||
|
@ -251,7 +251,7 @@ public class RolloverStepTests extends AbstractStepTestCase<RolloverStep> {
|
||||||
}).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
|
}).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
|
||||||
|
|
||||||
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
|
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
|
||||||
step.performAction(indexMetaData, new Listener() {
|
step.performAction(indexMetaData, null, new Listener() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(boolean complete) {
|
public void onResponse(boolean complete) {
|
||||||
|
@ -280,7 +280,7 @@ public class RolloverStepTests extends AbstractStepTestCase<RolloverStep> {
|
||||||
RolloverStep step = createRandomInstance();
|
RolloverStep step = createRandomInstance();
|
||||||
|
|
||||||
SetOnce<Exception> exceptionThrown = new SetOnce<>();
|
SetOnce<Exception> exceptionThrown = new SetOnce<>();
|
||||||
step.performAction(indexMetaData, new Listener() {
|
step.performAction(indexMetaData, null, new Listener() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(boolean complete) {
|
public void onResponse(boolean complete) {
|
||||||
throw new AssertionError("Unexpected method call");
|
throw new AssertionError("Unexpected method call");
|
||||||
|
|
|
@ -0,0 +1,411 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.SetOnce;
|
||||||
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||||
|
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
|
||||||
|
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsTestHelper;
|
||||||
|
import org.elasticsearch.client.AdminClient;
|
||||||
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.client.IndicesAdminClient;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||||
|
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||||
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
|
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||||
|
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.settings.Settings.Builder;
|
||||||
|
import org.elasticsearch.common.transport.TransportAddress;
|
||||||
|
import org.elasticsearch.index.Index;
|
||||||
|
import org.elasticsearch.index.IndexNotFoundException;
|
||||||
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
import org.elasticsearch.node.Node;
|
||||||
|
import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep.Listener;
|
||||||
|
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||||
|
import org.hamcrest.Matchers;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSingleNodeAllocateStep> {
|
||||||
|
|
||||||
|
private Client client;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
client = Mockito.mock(Client.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SetSingleNodeAllocateStep createRandomInstance() {
|
||||||
|
return new SetSingleNodeAllocateStep(randomStepKey(), randomStepKey(), client);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SetSingleNodeAllocateStep mutateInstance(SetSingleNodeAllocateStep instance) {
|
||||||
|
StepKey key = instance.getKey();
|
||||||
|
StepKey nextKey = instance.getNextStepKey();
|
||||||
|
|
||||||
|
switch (between(0, 1)) {
|
||||||
|
case 0:
|
||||||
|
key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||||
|
break;
|
||||||
|
case 1:
|
||||||
|
nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new AssertionError("Illegal randomisation branch");
|
||||||
|
}
|
||||||
|
|
||||||
|
return new SetSingleNodeAllocateStep(key, nextKey, instance.getClient());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SetSingleNodeAllocateStep copyInstance(SetSingleNodeAllocateStep instance) {
|
||||||
|
return new SetSingleNodeAllocateStep(instance.getKey(), instance.getNextStepKey(), client);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testPerformActionNoAttrs() {
|
||||||
|
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10)).settings(settings(Version.CURRENT))
|
||||||
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
|
Index index = indexMetaData.getIndex();
|
||||||
|
Set<String> validNodeNames = new HashSet<>();
|
||||||
|
Settings validNodeSettings = Settings.EMPTY;
|
||||||
|
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
|
||||||
|
int numNodes = randomIntBetween(1, 20);
|
||||||
|
for (int i = 0; i < numNodes; i++) {
|
||||||
|
String nodeId = "node_id_" + i;
|
||||||
|
String nodeName = "node_" + i;
|
||||||
|
int nodePort = 9300 + i;
|
||||||
|
Settings nodeSettings = Settings.builder().put(validNodeSettings).put("node.name", nodeName).build();
|
||||||
|
nodes.add(
|
||||||
|
DiscoveryNode.createLocal(nodeSettings, new TransportAddress(TransportAddress.META_ADDRESS, nodePort), nodeId));
|
||||||
|
validNodeNames.add(nodeName);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNodeSelected(indexMetaData, index, validNodeNames, nodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testPerformActionAttrsAllNodesValid() {
|
||||||
|
int numAttrs = randomIntBetween(1, 10);
|
||||||
|
String[][] validAttrs = new String[numAttrs][2];
|
||||||
|
for (int i = 0; i < numAttrs; i++) {
|
||||||
|
validAttrs[i] = new String[] { randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20) };
|
||||||
|
}
|
||||||
|
Settings.Builder indexSettings = settings(Version.CURRENT);
|
||||||
|
for (String[] attr : validAttrs) {
|
||||||
|
indexSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + attr[0], attr[1]);
|
||||||
|
}
|
||||||
|
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10)).settings(indexSettings)
|
||||||
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
|
Index index = indexMetaData.getIndex();
|
||||||
|
Set<String> validNodeNames = new HashSet<>();
|
||||||
|
Settings validNodeSettings = Settings.EMPTY;
|
||||||
|
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
|
||||||
|
int numNodes = randomIntBetween(1, 20);
|
||||||
|
for (int i = 0; i < numNodes; i++) {
|
||||||
|
String nodeId = "node_id_" + i;
|
||||||
|
String nodeName = "node_" + i;
|
||||||
|
int nodePort = 9300 + i;
|
||||||
|
String[] nodeAttr = randomFrom(validAttrs);
|
||||||
|
Settings nodeSettings = Settings.builder().put(validNodeSettings).put(Node.NODE_NAME_SETTING.getKey(), nodeName)
|
||||||
|
.put(Node.NODE_ATTRIBUTES.getKey() + nodeAttr[0], nodeAttr[1]).build();
|
||||||
|
nodes.add(DiscoveryNode.createLocal(nodeSettings, new TransportAddress(TransportAddress.META_ADDRESS, nodePort), nodeId));
|
||||||
|
validNodeNames.add(nodeName);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNodeSelected(indexMetaData, index, validNodeNames, nodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testPerformActionAttrsSomeNodesValid() {
|
||||||
|
String[] validAttr = new String[] { "box_type", "valid" };
|
||||||
|
String[] invalidAttr = new String[] { "box_type", "not_valid" };
|
||||||
|
Settings.Builder indexSettings = settings(Version.CURRENT);
|
||||||
|
indexSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + validAttr[0], validAttr[1]);
|
||||||
|
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10)).settings(indexSettings)
|
||||||
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
|
Index index = indexMetaData.getIndex();
|
||||||
|
Set<String> validNodeNames = new HashSet<>();
|
||||||
|
Settings validNodeSettings = Settings.builder().put(Node.NODE_ATTRIBUTES.getKey() + validAttr[0], validAttr[1]).build();
|
||||||
|
Settings invalidNodeSettings = Settings.builder().put(Node.NODE_ATTRIBUTES.getKey() + invalidAttr[0], invalidAttr[1]).build();
|
||||||
|
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
|
||||||
|
int numNodes = randomIntBetween(1, 20);
|
||||||
|
for (int i = 0; i < numNodes; i++) {
|
||||||
|
String nodeId = "node_id_" + i;
|
||||||
|
String nodeName = "node_" + i;
|
||||||
|
int nodePort = 9300 + i;
|
||||||
|
Builder nodeSettingsBuilder = Settings.builder();
|
||||||
|
// randomise whether the node had valid attributes or not but make sure at least one node is valid
|
||||||
|
if (randomBoolean() || (i == numNodes - 1 && validNodeNames.isEmpty())) {
|
||||||
|
nodeSettingsBuilder.put(validNodeSettings).put(Node.NODE_NAME_SETTING.getKey(), nodeName);
|
||||||
|
validNodeNames.add(nodeName);
|
||||||
|
} else {
|
||||||
|
nodeSettingsBuilder.put(invalidNodeSettings).put(Node.NODE_NAME_SETTING.getKey(), nodeName);
|
||||||
|
}
|
||||||
|
nodes.add(DiscoveryNode.createLocal(nodeSettingsBuilder.build(), new TransportAddress(TransportAddress.META_ADDRESS, nodePort),
|
||||||
|
nodeId));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNodeSelected(indexMetaData, index, validNodeNames, nodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testPerformActionAttrsNoNodesValid() {
|
||||||
|
String[] validAttr = new String[] { "box_type", "valid" };
|
||||||
|
String[] invalidAttr = new String[] { "box_type", "not_valid" };
|
||||||
|
Settings.Builder indexSettings = settings(Version.CURRENT);
|
||||||
|
indexSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + validAttr[0], validAttr[1]);
|
||||||
|
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10)).settings(indexSettings)
|
||||||
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
|
Index index = indexMetaData.getIndex();
|
||||||
|
Settings invalidNodeSettings = Settings.builder().put(Node.NODE_ATTRIBUTES.getKey() + invalidAttr[0], invalidAttr[1]).build();
|
||||||
|
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
|
||||||
|
int numNodes = randomIntBetween(1, 20);
|
||||||
|
for (int i = 0; i < numNodes; i++) {
|
||||||
|
String nodeId = "node_id_" + i;
|
||||||
|
String nodeName = "node_" + i;
|
||||||
|
int nodePort = 9300 + i;
|
||||||
|
Builder nodeSettingsBuilder = Settings.builder().put(invalidNodeSettings).put(Node.NODE_NAME_SETTING.getKey(), nodeName);
|
||||||
|
nodes.add(DiscoveryNode.createLocal(nodeSettingsBuilder.build(), new TransportAddress(TransportAddress.META_ADDRESS, nodePort),
|
||||||
|
nodeId));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNoValidNode(indexMetaData, index, nodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testPerformActionAttrsRequestFails() {
|
||||||
|
int numAttrs = randomIntBetween(1, 10);
|
||||||
|
String[][] validAttrs = new String[numAttrs][2];
|
||||||
|
for (int i = 0; i < numAttrs; i++) {
|
||||||
|
validAttrs[i] = new String[] { randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20) };
|
||||||
|
}
|
||||||
|
Settings.Builder indexSettings = settings(Version.CURRENT);
|
||||||
|
for (String[] attr : validAttrs) {
|
||||||
|
indexSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + attr[0], attr[1]);
|
||||||
|
}
|
||||||
|
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10)).settings(indexSettings)
|
||||||
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
|
Index index = indexMetaData.getIndex();
|
||||||
|
Set<String> validNodeNames = new HashSet<>();
|
||||||
|
Settings validNodeSettings = Settings.EMPTY;
|
||||||
|
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
|
||||||
|
int numNodes = randomIntBetween(1, 20);
|
||||||
|
for (int i = 0; i < numNodes; i++) {
|
||||||
|
String nodeId = "node_id_" + i;
|
||||||
|
String nodeName = "node_" + i;
|
||||||
|
int nodePort = 9300 + i;
|
||||||
|
String[] nodeAttr = randomFrom(validAttrs);
|
||||||
|
Settings nodeSettings = Settings.builder().put(validNodeSettings).put(Node.NODE_NAME_SETTING.getKey(), nodeName)
|
||||||
|
.put(Node.NODE_ATTRIBUTES.getKey() + nodeAttr[0], nodeAttr[1]).build();
|
||||||
|
nodes.add(DiscoveryNode.createLocal(nodeSettings, new TransportAddress(TransportAddress.META_ADDRESS, nodePort), nodeId));
|
||||||
|
validNodeNames.add(nodeName);
|
||||||
|
}
|
||||||
|
|
||||||
|
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
|
||||||
|
indexMetaData);
|
||||||
|
IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index)
|
||||||
|
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node_id_1", true, ShardRoutingState.STARTED));
|
||||||
|
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
|
||||||
|
.nodes(nodes).routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
|
||||||
|
|
||||||
|
SetSingleNodeAllocateStep step = createRandomInstance();
|
||||||
|
Exception exception = new RuntimeException();
|
||||||
|
|
||||||
|
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||||
|
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||||
|
|
||||||
|
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||||
|
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||||
|
Mockito.doAnswer(new Answer<Void>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
|
||||||
|
UpdateSettingsTestHelper.assertSettingsRequestContainsValueFrom(request,
|
||||||
|
IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_name", validNodeNames, true,
|
||||||
|
indexMetaData.getIndex().getName());
|
||||||
|
listener.onFailure(exception);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
|
||||||
|
|
||||||
|
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
|
||||||
|
step.performAction(indexMetaData, clusterState, new Listener() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onResponse(boolean complete) {
|
||||||
|
throw new AssertionError("Unexpected method call");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
assertSame(exception, e);
|
||||||
|
exceptionThrown.set(true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assertEquals(true, exceptionThrown.get());
|
||||||
|
|
||||||
|
Mockito.verify(client, Mockito.only()).admin();
|
||||||
|
Mockito.verify(adminClient, Mockito.only()).indices();
|
||||||
|
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testPerformActionAttrsNoShard() {
|
||||||
|
int numAttrs = randomIntBetween(1, 10);
|
||||||
|
String[][] validAttrs = new String[numAttrs][2];
|
||||||
|
for (int i = 0; i < numAttrs; i++) {
|
||||||
|
validAttrs[i] = new String[] { randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20) };
|
||||||
|
}
|
||||||
|
Settings.Builder indexSettings = settings(Version.CURRENT);
|
||||||
|
for (String[] attr : validAttrs) {
|
||||||
|
indexSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + attr[0], attr[1]);
|
||||||
|
}
|
||||||
|
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10)).settings(indexSettings)
|
||||||
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
|
Index index = indexMetaData.getIndex();
|
||||||
|
Set<String> validNodeNames = new HashSet<>();
|
||||||
|
Settings validNodeSettings = Settings.EMPTY;
|
||||||
|
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
|
||||||
|
int numNodes = randomIntBetween(1, 20);
|
||||||
|
for (int i = 0; i < numNodes; i++) {
|
||||||
|
String nodeId = "node_id_" + i;
|
||||||
|
String nodeName = "node_" + i;
|
||||||
|
int nodePort = 9300 + i;
|
||||||
|
String[] nodeAttr = randomFrom(validAttrs);
|
||||||
|
Settings nodeSettings = Settings.builder().put(validNodeSettings).put(Node.NODE_NAME_SETTING.getKey(), nodeName)
|
||||||
|
.put(Node.NODE_ATTRIBUTES.getKey() + nodeAttr[0], nodeAttr[1]).build();
|
||||||
|
nodes.add(DiscoveryNode.createLocal(nodeSettings, new TransportAddress(TransportAddress.META_ADDRESS, nodePort), nodeId));
|
||||||
|
validNodeNames.add(nodeName);
|
||||||
|
}
|
||||||
|
|
||||||
|
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
|
||||||
|
indexMetaData);
|
||||||
|
IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index);
|
||||||
|
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
|
||||||
|
.nodes(nodes).routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
|
||||||
|
|
||||||
|
SetSingleNodeAllocateStep step = createRandomInstance();
|
||||||
|
|
||||||
|
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
|
||||||
|
step.performAction(indexMetaData, clusterState, new Listener() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onResponse(boolean complete) {
|
||||||
|
throw new AssertionError("Unexpected method call");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
assertThat(e, Matchers.instanceOf(IndexNotFoundException.class));
|
||||||
|
assertEquals(indexMetaData.getIndex(), ((IndexNotFoundException) e).getIndex());
|
||||||
|
exceptionThrown.set(true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assertEquals(true, exceptionThrown.get());
|
||||||
|
|
||||||
|
Mockito.verifyZeroInteractions(client);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertNodeSelected(IndexMetaData indexMetaData, Index index, Set<String> validNodeNames, DiscoveryNodes.Builder nodes) {
|
||||||
|
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
|
||||||
|
indexMetaData);
|
||||||
|
IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index)
|
||||||
|
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node_id_1", true, ShardRoutingState.STARTED));
|
||||||
|
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
|
||||||
|
.nodes(nodes).routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
|
||||||
|
|
||||||
|
SetSingleNodeAllocateStep step = createRandomInstance();
|
||||||
|
|
||||||
|
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||||
|
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||||
|
|
||||||
|
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||||
|
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||||
|
Mockito.doAnswer(new Answer<Void>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
|
||||||
|
UpdateSettingsTestHelper.assertSettingsRequestContainsValueFrom(request,
|
||||||
|
IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_name", validNodeNames, true,
|
||||||
|
indexMetaData.getIndex().getName());
|
||||||
|
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
|
||||||
|
|
||||||
|
SetOnce<Boolean> actionCompleted = new SetOnce<>();
|
||||||
|
|
||||||
|
step.performAction(indexMetaData, clusterState, new Listener() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onResponse(boolean complete) {
|
||||||
|
actionCompleted.set(complete);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
throw new AssertionError("Unexpected method call", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assertEquals(true, actionCompleted.get());
|
||||||
|
|
||||||
|
Mockito.verify(client, Mockito.only()).admin();
|
||||||
|
Mockito.verify(adminClient, Mockito.only()).indices();
|
||||||
|
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertNoValidNode(IndexMetaData indexMetaData, Index index, DiscoveryNodes.Builder nodes) {
|
||||||
|
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
|
||||||
|
indexMetaData);
|
||||||
|
IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index)
|
||||||
|
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node_id_1", true, ShardRoutingState.STARTED));
|
||||||
|
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
|
||||||
|
.nodes(nodes).routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
|
||||||
|
|
||||||
|
SetSingleNodeAllocateStep step = createRandomInstance();
|
||||||
|
|
||||||
|
SetOnce<Boolean> actionCompleted = new SetOnce<>();
|
||||||
|
|
||||||
|
step.performAction(indexMetaData, clusterState, new Listener() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onResponse(boolean complete) {
|
||||||
|
actionCompleted.set(complete);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
throw new AssertionError("Unexpected method call", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assertEquals(false, actionCompleted.get());
|
||||||
|
|
||||||
|
Mockito.verifyZeroInteractions(client);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -48,24 +48,37 @@ public class ShrinkActionTests extends AbstractSerializingTestCase<ShrinkAction>
|
||||||
StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
|
StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
|
||||||
randomAlphaOfLengthBetween(1, 10));
|
randomAlphaOfLengthBetween(1, 10));
|
||||||
List<Step> steps = action.toSteps(null, phase, nextStepKey);
|
List<Step> steps = action.toSteps(null, phase, nextStepKey);
|
||||||
assertThat(steps.size(), equalTo(4));
|
assertThat(steps.size(), equalTo(6));
|
||||||
StepKey expectedFirstKey = new StepKey(phase, ShrinkAction.NAME, ShrinkStep.NAME);
|
StepKey expectedFirstKey = new StepKey(phase, ShrinkAction.NAME, SetSingleNodeAllocateStep.NAME);
|
||||||
StepKey expectedSecondKey = new StepKey(phase, ShrinkAction.NAME, ShrunkShardsAllocatedStep.NAME);
|
StepKey expectedSecondKey = new StepKey(phase, ShrinkAction.NAME, AllocationRoutedStep.NAME);
|
||||||
StepKey expectedThirdKey = new StepKey(phase, ShrinkAction.NAME, ShrinkSetAliasStep.NAME);
|
StepKey expectedThirdKey = new StepKey(phase, ShrinkAction.NAME, ShrinkStep.NAME);
|
||||||
StepKey expectedFourthKey = new StepKey(phase, ShrinkAction.NAME, ShrunkenIndexCheckStep.NAME);
|
StepKey expectedFourthKey = new StepKey(phase, ShrinkAction.NAME, ShrunkShardsAllocatedStep.NAME);
|
||||||
assertTrue(steps.get(0) instanceof ShrinkStep);
|
StepKey expectedFifthKey = new StepKey(phase, ShrinkAction.NAME, ShrinkSetAliasStep.NAME);
|
||||||
|
StepKey expectedSixthKey = new StepKey(phase, ShrinkAction.NAME, ShrunkenIndexCheckStep.NAME);
|
||||||
|
assertTrue(steps.get(0) instanceof SetSingleNodeAllocateStep);
|
||||||
assertThat(steps.get(0).getKey(), equalTo(expectedFirstKey));
|
assertThat(steps.get(0).getKey(), equalTo(expectedFirstKey));
|
||||||
assertThat(((ShrinkStep) steps.get(0)).getNumberOfShards(), equalTo(action.getNumberOfShards()));
|
assertThat(steps.get(0).getNextStepKey(), equalTo(expectedSecondKey));
|
||||||
assertThat(((ShrinkStep) steps.get(0)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
assertTrue(steps.get(1) instanceof AllocationRoutedStep);
|
||||||
assertTrue(steps.get(1) instanceof ShrunkShardsAllocatedStep);
|
|
||||||
assertThat(steps.get(1).getKey(), equalTo(expectedSecondKey));
|
assertThat(steps.get(1).getKey(), equalTo(expectedSecondKey));
|
||||||
assertThat(((ShrunkShardsAllocatedStep) steps.get(1)).getNumberOfShards(), equalTo(action.getNumberOfShards()));
|
assertThat(steps.get(1).getNextStepKey(), equalTo(expectedThirdKey));
|
||||||
assertThat(((ShrunkShardsAllocatedStep) steps.get(1)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
assertThat(((AllocationRoutedStep) steps.get(1)).getWaitOnAllShardCopies(), equalTo(false));
|
||||||
assertTrue(steps.get(2) instanceof ShrinkSetAliasStep);
|
assertTrue(steps.get(2) instanceof ShrinkStep);
|
||||||
assertThat(steps.get(2).getKey(), equalTo(expectedThirdKey));
|
assertThat(steps.get(2).getKey(), equalTo(expectedThirdKey));
|
||||||
assertThat(((ShrinkSetAliasStep) steps.get(2)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
assertThat(steps.get(2).getNextStepKey(), equalTo(expectedFourthKey));
|
||||||
assertTrue(steps.get(3) instanceof ShrunkenIndexCheckStep);
|
assertThat(((ShrinkStep) steps.get(2)).getNumberOfShards(), equalTo(action.getNumberOfShards()));
|
||||||
|
assertThat(((ShrinkStep) steps.get(2)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
||||||
|
assertTrue(steps.get(3) instanceof ShrunkShardsAllocatedStep);
|
||||||
assertThat(steps.get(3).getKey(), equalTo(expectedFourthKey));
|
assertThat(steps.get(3).getKey(), equalTo(expectedFourthKey));
|
||||||
assertThat(((ShrunkenIndexCheckStep) steps.get(3)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
assertThat(steps.get(3).getNextStepKey(), equalTo(expectedFifthKey));
|
||||||
|
assertThat(((ShrunkShardsAllocatedStep) steps.get(3)).getNumberOfShards(), equalTo(action.getNumberOfShards()));
|
||||||
|
assertThat(((ShrunkShardsAllocatedStep) steps.get(3)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
||||||
|
assertTrue(steps.get(4) instanceof ShrinkSetAliasStep);
|
||||||
|
assertThat(steps.get(4).getKey(), equalTo(expectedFifthKey));
|
||||||
|
assertThat(steps.get(4).getNextStepKey(), equalTo(expectedSixthKey));
|
||||||
|
assertThat(((ShrinkSetAliasStep) steps.get(4)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
||||||
|
assertTrue(steps.get(5) instanceof ShrunkenIndexCheckStep);
|
||||||
|
assertThat(steps.get(5).getKey(), equalTo(expectedSixthKey));
|
||||||
|
assertThat(steps.get(5).getNextStepKey(), equalTo(nextStepKey));
|
||||||
|
assertThat(((ShrunkenIndexCheckStep) steps.get(5)).getShrunkIndexPrefix(), equalTo(ShrinkAction.SHRUNKEN_INDEX_PREFIX));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,7 +104,7 @@ public class ShrinkSetAliasStepTests extends AbstractStepTestCase<ShrinkSetAlias
|
||||||
}).when(indicesClient).aliases(Mockito.any(), Mockito.any());
|
}).when(indicesClient).aliases(Mockito.any(), Mockito.any());
|
||||||
|
|
||||||
SetOnce<Boolean> actionCompleted = new SetOnce<>();
|
SetOnce<Boolean> actionCompleted = new SetOnce<>();
|
||||||
step.performAction(indexMetaData, new Listener() {
|
step.performAction(indexMetaData, null, new Listener() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(boolean complete) {
|
public void onResponse(boolean complete) {
|
||||||
|
@ -148,7 +148,7 @@ public class ShrinkSetAliasStepTests extends AbstractStepTestCase<ShrinkSetAlias
|
||||||
}).when(indicesClient).aliases(Mockito.any(), Mockito.any());
|
}).when(indicesClient).aliases(Mockito.any(), Mockito.any());
|
||||||
|
|
||||||
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
|
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
|
||||||
step.performAction(indexMetaData, new Listener() {
|
step.performAction(indexMetaData, null, new Listener() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(boolean complete) {
|
public void onResponse(boolean complete) {
|
||||||
|
|
|
@ -133,7 +133,7 @@ public class ShrinkStepTests extends AbstractStepTestCase<ShrinkStep> {
|
||||||
}).when(indicesClient).resizeIndex(Mockito.any(), Mockito.any());
|
}).when(indicesClient).resizeIndex(Mockito.any(), Mockito.any());
|
||||||
|
|
||||||
SetOnce<Boolean> actionCompleted = new SetOnce<>();
|
SetOnce<Boolean> actionCompleted = new SetOnce<>();
|
||||||
step.performAction(sourceIndexMetaData, new Listener() {
|
step.performAction(sourceIndexMetaData, null, new Listener() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(boolean complete) {
|
public void onResponse(boolean complete) {
|
||||||
|
@ -178,7 +178,7 @@ public class ShrinkStepTests extends AbstractStepTestCase<ShrinkStep> {
|
||||||
}).when(indicesClient).resizeIndex(Mockito.any(), Mockito.any());
|
}).when(indicesClient).resizeIndex(Mockito.any(), Mockito.any());
|
||||||
|
|
||||||
SetOnce<Boolean> actionCompleted = new SetOnce<>();
|
SetOnce<Boolean> actionCompleted = new SetOnce<>();
|
||||||
step.performAction(indexMetaData, new Listener() {
|
step.performAction(indexMetaData, null, new Listener() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(boolean complete) {
|
public void onResponse(boolean complete) {
|
||||||
|
@ -222,7 +222,7 @@ public class ShrinkStepTests extends AbstractStepTestCase<ShrinkStep> {
|
||||||
}).when(indicesClient).resizeIndex(Mockito.any(), Mockito.any());
|
}).when(indicesClient).resizeIndex(Mockito.any(), Mockito.any());
|
||||||
|
|
||||||
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
|
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
|
||||||
step.performAction(indexMetaData, new Listener() {
|
step.performAction(indexMetaData, null, new Listener() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(boolean complete) {
|
public void onResponse(boolean complete) {
|
||||||
|
|
|
@ -97,7 +97,7 @@ public class UpdateSettingsStepTests extends AbstractStepTestCase<UpdateSettings
|
||||||
|
|
||||||
SetOnce<Boolean> actionCompleted = new SetOnce<>();
|
SetOnce<Boolean> actionCompleted = new SetOnce<>();
|
||||||
|
|
||||||
step.performAction(indexMetaData, new Listener() {
|
step.performAction(indexMetaData, null, new Listener() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(boolean complete) {
|
public void onResponse(boolean complete) {
|
||||||
|
@ -143,7 +143,7 @@ public class UpdateSettingsStepTests extends AbstractStepTestCase<UpdateSettings
|
||||||
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
|
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
|
||||||
|
|
||||||
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
|
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
|
||||||
step.performAction(indexMetaData, new Listener() {
|
step.performAction(indexMetaData, null, new Listener() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(boolean complete) {
|
public void onResponse(boolean complete) {
|
||||||
|
|
|
@ -38,7 +38,9 @@ public class IndexLifecycleRunner {
|
||||||
this.nowSupplier = nowSupplier;
|
this.nowSupplier = nowSupplier;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void runPolicy(String policy, IndexMetaData indexMetaData, Settings indexSettings, boolean fromClusterStateChange) {
|
public void runPolicy(String policy, IndexMetaData indexMetaData, ClusterState currentState,
|
||||||
|
boolean fromClusterStateChange) {
|
||||||
|
Settings indexSettings = indexMetaData.getSettings();
|
||||||
Step currentStep = getCurrentStep(stepRegistry, policy, indexSettings);
|
Step currentStep = getCurrentStep(stepRegistry, policy, indexSettings);
|
||||||
logger.warn("running policy with current-step[" + currentStep.getKey() + "]");
|
logger.warn("running policy with current-step[" + currentStep.getKey() + "]");
|
||||||
if (currentStep instanceof TerminalPolicyStep) {
|
if (currentStep instanceof TerminalPolicyStep) {
|
||||||
|
@ -66,7 +68,7 @@ public class IndexLifecycleRunner {
|
||||||
}
|
}
|
||||||
} else if (currentStep instanceof AsyncActionStep) {
|
} else if (currentStep instanceof AsyncActionStep) {
|
||||||
if (fromClusterStateChange == false) {
|
if (fromClusterStateChange == false) {
|
||||||
((AsyncActionStep) currentStep).performAction(indexMetaData, new AsyncActionStep.Listener() {
|
((AsyncActionStep) currentStep).performAction(indexMetaData, currentState, new AsyncActionStep.Listener() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(boolean complete) {
|
public void onResponse(boolean complete) {
|
||||||
|
@ -88,10 +90,10 @@ public class IndexLifecycleRunner {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runPolicy(IndexMetaData indexMetaData) {
|
private void runPolicy(IndexMetaData indexMetaData, ClusterState currentState) {
|
||||||
Settings indexSettings = indexMetaData.getSettings();
|
Settings indexSettings = indexMetaData.getSettings();
|
||||||
String policy = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings);
|
String policy = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings);
|
||||||
runPolicy(policy, indexMetaData, indexSettings, false);
|
runPolicy(policy, indexMetaData, currentState, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void executeClusterStateSteps(Index index, String policy, Step step) {
|
private void executeClusterStateSteps(Index index, String policy, Step step) {
|
||||||
|
@ -154,6 +156,6 @@ public class IndexLifecycleRunner {
|
||||||
logger.error("moveToStep[" + policy + "] [" + index.getName() + "]" + currentStepKey + " -> "
|
logger.error("moveToStep[" + policy + "] [" + index.getName() + "]" + currentStepKey + " -> "
|
||||||
+ nextStepKey);
|
+ nextStepKey);
|
||||||
clusterService.submitStateUpdateTask("ILM", new MoveToNextStepUpdateTask(index, policy, currentStepKey,
|
clusterService.submitStateUpdateTask("ILM", new MoveToNextStepUpdateTask(index, policy, currentStepKey,
|
||||||
nextStepKey, nowSupplier, newState -> runPolicy(newState.getMetaData().index(index))));
|
nextStepKey, nowSupplier, newState -> runPolicy(newState.getMetaData().index(index), newState)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -150,7 +150,7 @@ public class IndexLifecycleService extends AbstractComponent
|
||||||
clusterState.metaData().indices().valuesIt().forEachRemaining((idxMeta) -> {
|
clusterState.metaData().indices().valuesIt().forEachRemaining((idxMeta) -> {
|
||||||
String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings());
|
String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings());
|
||||||
if (Strings.isNullOrEmpty(policyName) == false) {
|
if (Strings.isNullOrEmpty(policyName) == false) {
|
||||||
lifecycleRunner.runPolicy(policyName, idxMeta, idxMeta.getSettings(), fromClusterStateChange);
|
lifecycleRunner.runPolicy(policyName, idxMeta, clusterState, fromClusterStateChange);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,9 +53,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
Settings indexSettings = Settings.builder().build();
|
|
||||||
|
|
||||||
runner.runPolicy(policyName, indexMetaData, indexSettings, false);
|
runner.runPolicy(policyName, indexMetaData, null, false);
|
||||||
|
|
||||||
Mockito.verifyZeroInteractions(clusterService);
|
Mockito.verifyZeroInteractions(clusterService);
|
||||||
}
|
}
|
||||||
|
@ -69,9 +68,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
Settings indexSettings = Settings.builder().build();
|
|
||||||
|
|
||||||
runner.runPolicy(policyName, indexMetaData, indexSettings, randomBoolean());
|
runner.runPolicy(policyName, indexMetaData, null, randomBoolean());
|
||||||
|
|
||||||
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
|
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
|
||||||
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetaData.getIndex(), policyName, step)));
|
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetaData.getIndex(), policyName, step)));
|
||||||
|
@ -88,9 +86,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
Settings indexSettings = Settings.builder().build();
|
|
||||||
|
|
||||||
runner.runPolicy(policyName, indexMetaData, indexSettings, randomBoolean());
|
runner.runPolicy(policyName, indexMetaData, null, randomBoolean());
|
||||||
|
|
||||||
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
|
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
|
||||||
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetaData.getIndex(), policyName, step)));
|
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetaData.getIndex(), policyName, step)));
|
||||||
|
@ -107,9 +104,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
Settings indexSettings = Settings.builder().build();
|
|
||||||
|
|
||||||
runner.runPolicy(policyName, indexMetaData, indexSettings, false);
|
runner.runPolicy(policyName, indexMetaData, null, false);
|
||||||
|
|
||||||
assertEquals(1, step.getExecuteCount());
|
assertEquals(1, step.getExecuteCount());
|
||||||
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
|
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
|
||||||
|
@ -128,9 +124,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
Settings indexSettings = Settings.builder().build();
|
|
||||||
|
|
||||||
runner.runPolicy(policyName, indexMetaData, indexSettings, false);
|
runner.runPolicy(policyName, indexMetaData, null, false);
|
||||||
|
|
||||||
assertEquals(1, step.getExecuteCount());
|
assertEquals(1, step.getExecuteCount());
|
||||||
Mockito.verifyZeroInteractions(clusterService);
|
Mockito.verifyZeroInteractions(clusterService);
|
||||||
|
@ -146,9 +141,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
Settings indexSettings = Settings.builder().build();
|
|
||||||
|
|
||||||
runner.runPolicy(policyName, indexMetaData, indexSettings, false);
|
runner.runPolicy(policyName, indexMetaData, null, false);
|
||||||
|
|
||||||
assertEquals(1, step.getExecuteCount());
|
assertEquals(1, step.getExecuteCount());
|
||||||
Mockito.verifyZeroInteractions(clusterService);
|
Mockito.verifyZeroInteractions(clusterService);
|
||||||
|
@ -165,10 +159,9 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
Settings indexSettings = Settings.builder().build();
|
|
||||||
|
|
||||||
RuntimeException exception = expectThrows(RuntimeException.class,
|
RuntimeException exception = expectThrows(RuntimeException.class,
|
||||||
() -> runner.runPolicy(policyName, indexMetaData, indexSettings, false));
|
() -> runner.runPolicy(policyName, indexMetaData, null, false));
|
||||||
|
|
||||||
assertSame(expectedException, exception.getCause());
|
assertSame(expectedException, exception.getCause());
|
||||||
assertEquals(1, step.getExecuteCount());
|
assertEquals(1, step.getExecuteCount());
|
||||||
|
@ -186,9 +179,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
Settings indexSettings = Settings.builder().build();
|
|
||||||
|
|
||||||
runner.runPolicy(policyName, indexMetaData, indexSettings, true);
|
runner.runPolicy(policyName, indexMetaData, null, true);
|
||||||
|
|
||||||
assertEquals(0, step.getExecuteCount());
|
assertEquals(0, step.getExecuteCount());
|
||||||
Mockito.verifyZeroInteractions(clusterService);
|
Mockito.verifyZeroInteractions(clusterService);
|
||||||
|
@ -204,9 +196,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
Settings indexSettings = Settings.builder().build();
|
|
||||||
|
|
||||||
runner.runPolicy(policyName, indexMetaData, indexSettings, false);
|
runner.runPolicy(policyName, indexMetaData, null, false);
|
||||||
|
|
||||||
assertEquals(1, step.getExecuteCount());
|
assertEquals(1, step.getExecuteCount());
|
||||||
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
|
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
|
||||||
|
@ -224,9 +215,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
Settings indexSettings = Settings.builder().build();
|
|
||||||
|
|
||||||
runner.runPolicy(policyName, indexMetaData, indexSettings, false);
|
runner.runPolicy(policyName, indexMetaData, null, false);
|
||||||
|
|
||||||
assertEquals(1, step.getExecuteCount());
|
assertEquals(1, step.getExecuteCount());
|
||||||
Mockito.verifyZeroInteractions(clusterService);
|
Mockito.verifyZeroInteractions(clusterService);
|
||||||
|
@ -243,10 +233,9 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
Settings indexSettings = Settings.builder().build();
|
|
||||||
|
|
||||||
RuntimeException exception = expectThrows(RuntimeException.class,
|
RuntimeException exception = expectThrows(RuntimeException.class,
|
||||||
() -> runner.runPolicy(policyName, indexMetaData, indexSettings, false));
|
() -> runner.runPolicy(policyName, indexMetaData, null, false));
|
||||||
|
|
||||||
assertSame(expectedException, exception.getCause());
|
assertSame(expectedException, exception.getCause());
|
||||||
assertEquals(1, step.getExecuteCount());
|
assertEquals(1, step.getExecuteCount());
|
||||||
|
@ -264,9 +253,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
Settings indexSettings = Settings.builder().build();
|
|
||||||
|
|
||||||
runner.runPolicy(policyName, indexMetaData, indexSettings, true);
|
runner.runPolicy(policyName, indexMetaData, null, true);
|
||||||
|
|
||||||
assertEquals(0, step.getExecuteCount());
|
assertEquals(0, step.getExecuteCount());
|
||||||
Mockito.verifyZeroInteractions(clusterService);
|
Mockito.verifyZeroInteractions(clusterService);
|
||||||
|
@ -281,10 +269,9 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
Settings indexSettings = Settings.builder().build();
|
|
||||||
|
|
||||||
IllegalStateException exception = expectThrows(IllegalStateException.class,
|
IllegalStateException exception = expectThrows(IllegalStateException.class,
|
||||||
() -> runner.runPolicy(policyName, indexMetaData, indexSettings, randomBoolean()));
|
() -> runner.runPolicy(policyName, indexMetaData, null, randomBoolean()));
|
||||||
assertEquals("Step with key [" + stepKey + "] is not a recognised type: [" + step.getClass().getName() + "]",
|
assertEquals("Step with key [" + stepKey + "] is not a recognised type: [" + step.getClass().getName() + "]",
|
||||||
exception.getMessage());
|
exception.getMessage());
|
||||||
Mockito.verifyZeroInteractions(clusterService);
|
Mockito.verifyZeroInteractions(clusterService);
|
||||||
|
@ -591,7 +578,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void performAction(IndexMetaData indexMetaData, Listener listener) {
|
public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
|
||||||
executeCount++;
|
executeCount++;
|
||||||
if (exception == null) {
|
if (exception == null) {
|
||||||
listener.onResponse(willComplete);
|
listener.onResponse(willComplete);
|
||||||
|
|
Loading…
Reference in New Issue