Changes update replica and alllocation to async steps

Also renames EnoughShardsWaitStep to ReplicasAllocatedStep, removes it from the allocate action and adds a check that th number of replicas in the cluster state is correct to it.
This commit is contained in:
Colin Goodheart-Smithe 2018-04-10 16:20:29 +01:00
parent aa198b637e
commit 8d91f197d4
12 changed files with 390 additions and 232 deletions

View File

@ -113,13 +113,12 @@ public class AllocateAction implements LifecycleAction {
@Override
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
StepKey enoughKey = new StepKey(phase, NAME, EnoughShardsWaitStep.NAME);
StepKey allocateKey = new StepKey(phase, NAME, UpdateAllocationSettingsStep.NAME);
StepKey allocationRoutedKey = new StepKey(phase, NAME, AllocationRoutedStep.NAME);
UpdateAllocationSettingsStep allocateStep = new UpdateAllocationSettingsStep(allocateKey, allocationRoutedKey,
include, exclude, require);
UpdateAllocationSettingsStep allocateStep = new UpdateAllocationSettingsStep(allocateKey, allocationRoutedKey, client, include,
exclude, require);
AllocationRoutedStep routedCheckStep = new AllocationRoutedStep(allocationRoutedKey, nextStepKey);
return Arrays.asList(new EnoughShardsWaitStep(enoughKey, allocateKey), allocateStep, routedCheckStep);
return Arrays.asList(allocateStep, routedCheckStep);
}
@Override

View File

@ -1,24 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.index.Index;
public class EnoughShardsWaitStep extends ClusterStateWaitStep {
public static final String NAME = "enough-shards-allocated";
public EnoughShardsWaitStep(StepKey key, StepKey nextStepKey) {
super(key, nextStepKey);
}
@Override
public boolean isConditionMet(Index index, ClusterState clusterState) {
// We only want to make progress if all shards are active
return ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName());
}
}

View File

@ -30,8 +30,8 @@ public class ForceMergeAction implements LifecycleAction {
private static final ConstructingObjectParser<ForceMergeAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
false, a -> {
int maxNumSegments = (Integer) a[0];
boolean bestCompression = (Boolean) a[1];
int maxNumSegments = (int) a[0];
boolean bestCompression = a[1] == null ? false : (boolean) a[1];
return new ForceMergeAction(maxNumSegments, bestCompression);
});

View File

@ -74,9 +74,9 @@ public class ReplicasAction implements LifecycleAction {
@Override
public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey) {
StepKey updateReplicasKey = new StepKey(phase, NAME, UpdateReplicaSettingsStep.NAME);
StepKey enoughKey = new StepKey(phase, NAME, EnoughShardsWaitStep.NAME);
return Arrays.asList(new UpdateReplicaSettingsStep(updateReplicasKey, enoughKey, numberOfReplicas),
new EnoughShardsWaitStep(enoughKey, nextStepKey));
StepKey enoughKey = new StepKey(phase, NAME, ReplicasAllocatedStep.NAME);
return Arrays.asList(new UpdateReplicaSettingsStep(updateReplicasKey, enoughKey, client, numberOfReplicas),
new ReplicasAllocatedStep(enoughKey, nextStepKey, numberOfReplicas));
}
public int getNumberOfReplicas() {

View File

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import java.util.Objects;
public class ReplicasAllocatedStep extends ClusterStateWaitStep {
public static final String NAME = "enough-shards-allocated";
private int numberReplicas;
public ReplicasAllocatedStep(StepKey key, StepKey nextStepKey, int numberReplicas) {
super(key, nextStepKey);
this.numberReplicas = numberReplicas;
}
int getNumberReplicas() {
return numberReplicas;
}
@Override
public boolean isConditionMet(Index index, ClusterState clusterState) {
IndexMetaData idxMeta = clusterState.metaData().index(index);
if (idxMeta == null) {
throw new IndexNotFoundException("Index not found when executing " + getKey().getAction() + " lifecycle action.",
index.getName());
}
// We only want to make progress if the cluster state reflects the number of replicas change and all shards are active
return idxMeta.getNumberOfReplicas() == numberReplicas && ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName());
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), numberReplicas);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
ReplicasAllocatedStep other = (ReplicasAllocatedStep) obj;
return super.equals(obj) &&
Objects.equals(numberReplicas, other.numberReplicas);
}
}

View File

@ -5,57 +5,41 @@
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import java.util.Map;
import java.util.Objects;
public class UpdateAllocationSettingsStep extends ClusterStateActionStep {
public class UpdateAllocationSettingsStep extends AsyncActionStep {
public static final String NAME = "update-allocation";
private final Map<String, String> include;
private final Map<String, String> exclude;
private final Map<String, String> require;
public UpdateAllocationSettingsStep(StepKey key, StepKey nextStepKey, Map<String, String> include,
public UpdateAllocationSettingsStep(StepKey key, StepKey nextStepKey, Client client, Map<String, String> include,
Map<String, String> exclude, Map<String, String> require) {
super(key, nextStepKey);
super(key, nextStepKey, client);
this.include = include;
this.exclude = exclude;
this.require = require;
}
@Override
public ClusterState performAction(Index index, ClusterState clusterState) {
IndexMetaData idxMeta = clusterState.metaData().index(index);
if (idxMeta == null) {
return clusterState;
}
Settings existingSettings = idxMeta.getSettings();
public void performAction(Index index, Listener listener) {
Settings.Builder newSettings = Settings.builder();
addMissingAttrs(include, IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey(), existingSettings, newSettings);
addMissingAttrs(exclude, IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey(), existingSettings, newSettings);
addMissingAttrs(require, IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey(), existingSettings, newSettings);
return ClusterState.builder(clusterState)
.metaData(MetaData.builder(clusterState.metaData())
.updateSettings(newSettings.build(), index.getName())).build();
}
include.forEach((key, value) -> newSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + key, value));
exclude.forEach((key, value) -> newSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + key, value));
require.forEach((key, value) -> newSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + key, value));
/**
* Inspects the <code>existingSettings</code> and adds any attributes that
* are missing for the given <code>settingsPrefix</code> to the
* <code>newSettingsBuilder</code>.
*/
static void addMissingAttrs(Map<String, String> newAttrs, String settingPrefix, Settings existingSettings,
Settings.Builder newSettingsBuilder) {
newAttrs.entrySet().stream().filter(e -> {
String existingValue = existingSettings.get(settingPrefix + e.getKey());
return existingValue == null || (existingValue.equals(e.getValue()) == false);
}).forEach(e -> newSettingsBuilder.put(settingPrefix + e.getKey(), e.getValue()));
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index.getName()).settings(newSettings);
getClient().admin().indices().updateSettings(updateSettingsRequest,
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
}
Map<String, String> getInclude() {

View File

@ -5,35 +5,31 @@
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import java.util.Objects;
public class UpdateReplicaSettingsStep extends ClusterStateActionStep {
public class UpdateReplicaSettingsStep extends AsyncActionStep {
public static final String NAME = "update-replicas";
private int numberOfReplicas;
public UpdateReplicaSettingsStep(StepKey key, StepKey nextStepKey, int numberOfReplicas) {
super(key, nextStepKey);
public UpdateReplicaSettingsStep(StepKey key, StepKey nextStepKey, Client client, int numberOfReplicas) {
super(key, nextStepKey, client);
this.numberOfReplicas = numberOfReplicas;
}
@Override
public ClusterState performAction(Index index, ClusterState clusterState) {
IndexMetaData idxMeta = clusterState.metaData().index(index);
if (idxMeta == null) {
return clusterState;
}
Settings.Builder newSettings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas);
return ClusterState.builder(clusterState)
.metaData(MetaData.builder(clusterState.metaData())
.updateSettings(newSettings.build(), index.getName())).build();
public void performAction(Index index, Listener listener) {
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index.getName())
.settings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas));
getClient().admin().indices().updateSettings(updateSettingsRequest,
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
}
public int getNumberOfReplicas() {

View File

@ -105,21 +105,17 @@ public class AllocateActionTests extends AbstractSerializingTestCase<AllocateAct
List<Step> steps = action.toSteps(null, phase, nextStepKey);
assertNotNull(steps);
assertEquals(3, steps.size());
StepKey expectedFirstStepKey = new StepKey(phase, AllocateAction.NAME, EnoughShardsWaitStep.NAME);
StepKey expectedSecondStepKey = new StepKey(phase, AllocateAction.NAME, UpdateAllocationSettingsStep.NAME);
StepKey expectedThirdStepKey = new StepKey(phase, AllocateAction.NAME, AllocationRoutedStep.NAME);
EnoughShardsWaitStep firstStep = (EnoughShardsWaitStep) steps.get(0);
StepKey expectedFirstStepKey = new StepKey(phase, AllocateAction.NAME, UpdateAllocationSettingsStep.NAME);
StepKey expectedSecondStepKey = new StepKey(phase, AllocateAction.NAME, AllocationRoutedStep.NAME);
UpdateAllocationSettingsStep firstStep = (UpdateAllocationSettingsStep) steps.get(1);
assertEquals(expectedFirstStepKey, firstStep.getKey());
assertEquals(expectedSecondStepKey, firstStep.getNextStepKey());
UpdateAllocationSettingsStep secondStep = (UpdateAllocationSettingsStep) steps.get(1);
assertEquals(action.getInclude(), firstStep.getInclude());
assertEquals(action.getExclude(), firstStep.getExclude());
assertEquals(action.getRequire(), firstStep.getRequire());
AllocationRoutedStep secondStep = (AllocationRoutedStep) steps.get(2);
assertEquals(expectedSecondStepKey, secondStep.getKey());
assertEquals(expectedThirdStepKey, secondStep.getNextStepKey());
assertEquals(action.getInclude(), secondStep.getInclude());
assertEquals(action.getExclude(), secondStep.getExclude());
assertEquals(action.getRequire(), secondStep.getRequire());
AllocationRoutedStep thirdStep = (AllocationRoutedStep) steps.get(2);
assertEquals(expectedThirdStepKey, thirdStep.getKey());
assertEquals(nextStepKey, thirdStep.getNextStepKey());
assertEquals(nextStepKey, secondStep.getNextStepKey());
}
}

View File

@ -50,12 +50,12 @@ public class ReplicasActionTests extends AbstractSerializingTestCase<ReplicasAct
assertNotNull(steps);
assertEquals(2, steps.size());
StepKey expectedFirstStepKey = new StepKey(phase, ReplicasAction.NAME, UpdateReplicaSettingsStep.NAME);
StepKey expectedSecondStepKey = new StepKey(phase, ReplicasAction.NAME, EnoughShardsWaitStep.NAME);
StepKey expectedSecondStepKey = new StepKey(phase, ReplicasAction.NAME, ReplicasAllocatedStep.NAME);
UpdateReplicaSettingsStep firstStep = (UpdateReplicaSettingsStep) steps.get(0);
assertEquals(expectedFirstStepKey, firstStep.getKey());
assertEquals(expectedSecondStepKey, firstStep.getNextStepKey());
assertEquals(action.getNumberOfReplicas(), firstStep.getNumberOfReplicas());
EnoughShardsWaitStep secondStep = (EnoughShardsWaitStep) steps.get(1);
ReplicasAllocatedStep secondStep = (ReplicasAllocatedStep) steps.get(1);
assertEquals(expectedSecondStepKey, secondStep.getKey());
assertEquals(nextStepKey, secondStep.getNextStepKey());
}

View File

@ -19,49 +19,55 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
public class EnoughShardsWaitStepTests extends ESTestCase {
public class ReplicasAllocatedStepTests extends ESTestCase {
public EnoughShardsWaitStep createRandomInstance() {
public ReplicasAllocatedStep createRandomInstance() {
StepKey stepKey = new StepKey(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10));
StepKey nextStepKey = new StepKey(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10));
return new EnoughShardsWaitStep(stepKey, nextStepKey);
int numberReplicas = randomIntBetween(0, 100);
return new ReplicasAllocatedStep(stepKey, nextStepKey, numberReplicas);
}
public EnoughShardsWaitStep mutateInstance(EnoughShardsWaitStep instance) {
public ReplicasAllocatedStep mutateInstance(ReplicasAllocatedStep instance) {
StepKey key = instance.getKey();
StepKey nextKey = instance.getNextStepKey();
switch (between(0, 1)) {
int numberReplicas = instance.getNumberReplicas();
switch (between(0, 2)) {
case 0:
key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
break;
case 1:
nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
break;
case 2:
numberReplicas += randomIntBetween(1, 5);
break;
default:
throw new AssertionError("Illegal randomisation branch");
}
return new EnoughShardsWaitStep(key, nextKey);
return new ReplicasAllocatedStep(key, nextKey, numberReplicas);
}
public void testHashcodeAndEquals() {
EqualsHashCodeTestUtils.checkEqualsAndHashCode(createRandomInstance(),
instance -> new EnoughShardsWaitStep(instance.getKey(), instance.getNextStepKey()), this::mutateInstance);
instance -> new ReplicasAllocatedStep(instance.getKey(), instance.getNextStepKey(), instance.getNumberReplicas()),
this::mutateInstance);
}
public void testConditionMet() {
ReplicasAllocatedStep step = createRandomInstance();
IndexMetaData indexMetadata = IndexMetaData.builder(randomAlphaOfLength(5))
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0).build();
.numberOfReplicas(step.getNumberReplicas()).build();
MetaData metaData = MetaData.builder()
.persistentSettings(settings(Version.CURRENT).build())
.put(IndexMetaData.builder(indexMetadata))
@ -82,16 +88,15 @@ public class EnoughShardsWaitStepTests extends ESTestCase {
nodeId, true, ShardRoutingState.STARTED)))
.build())
.build();
EnoughShardsWaitStep step = createRandomInstance();
assertTrue(step.isConditionMet(indexMetadata.getIndex(), clusterState));
}
public void testConditionNotMet() {
public void testConditionNotMetAllocation() {
ReplicasAllocatedStep step = createRandomInstance();
IndexMetaData indexMetadata = IndexMetaData.builder(randomAlphaOfLength(5))
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0).build();
.numberOfReplicas(step.getNumberReplicas()).build();
MetaData metaData = MetaData.builder()
.persistentSettings(settings(Version.CURRENT).build())
.put(IndexMetaData.builder(indexMetadata))
@ -113,7 +118,47 @@ public class EnoughShardsWaitStepTests extends ESTestCase {
.build())
.build();
EnoughShardsWaitStep step = createRandomInstance();
assertFalse(step.isConditionMet(indexMetadata.getIndex(), clusterState));
}
public void testConditionNotMetNumberReplicas() {
ReplicasAllocatedStep step = createRandomInstance();
IndexMetaData indexMetadata = IndexMetaData.builder(randomAlphaOfLength(5))
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(randomValueOtherThan(step.getNumberReplicas(), () -> randomIntBetween(0, 100))).build();
MetaData metaData = MetaData.builder()
.persistentSettings(settings(Version.CURRENT).build())
.put(IndexMetaData.builder(indexMetadata))
.build();
Index index = indexMetadata.getIndex();
String nodeId = randomAlphaOfLength(10);
DiscoveryNode masterNode = DiscoveryNode.createLocal(settings(Version.CURRENT)
.put(Node.NODE_MASTER_SETTING.getKey(), true).build(),
new TransportAddress(TransportAddress.META_ADDRESS, 9300), nodeId);
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.routingTable(RoutingTable.builder()
.add(IndexRoutingTable.builder(index).addShard(
TestShardRouting.newShardRouting(new ShardId(index, 0),
nodeId, true, ShardRoutingState.STARTED)))
.build())
.build();
assertFalse(step.isConditionMet(indexMetadata.getIndex(), clusterState));
}
public void testConditionIndexMissing() throws Exception {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).build();
ReplicasAllocatedStep step = createRandomInstance();
IndexNotFoundException thrownException = expectThrows(IndexNotFoundException.class, () -> step.isConditionMet(index, clusterState));
assertEquals("Index not found when executing " + step.getKey().getAction() + " lifecycle action.", thrownException.getMessage());
assertEquals(index.getName(), thrownException.getIndex().getName());
}
}

View File

@ -6,27 +6,38 @@
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsTestHelper;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep.Listener;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
import org.junit.Before;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.equalTo;
public class UpdateAllocationSettingsStepTests extends ESTestCase {
private Client client;
@Before
public void setup() {
client = Mockito.mock(Client.class);
}
public UpdateAllocationSettingsStep createRandomInstance() {
StepKey stepKey = new StepKey(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10));
StepKey nextStepKey = new StepKey(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10));
@ -34,7 +45,7 @@ public class UpdateAllocationSettingsStepTests extends ESTestCase {
Map<String, String> exclude = AllocateActionTests.randomMap(0, 10);
Map<String, String> require = AllocateActionTests.randomMap(0, 10);
return new UpdateAllocationSettingsStep(stepKey, nextStepKey, include, exclude, require);
return new UpdateAllocationSettingsStep(stepKey, nextStepKey, client, include, exclude, require);
}
public UpdateAllocationSettingsStep mutateInstance(UpdateAllocationSettingsStep instance) {
@ -67,105 +78,120 @@ public class UpdateAllocationSettingsStepTests extends ESTestCase {
throw new AssertionError("Illegal randomisation branch");
}
return new UpdateAllocationSettingsStep(key, nextKey, include, exclude, require);
return new UpdateAllocationSettingsStep(key, nextKey, client, include, exclude, require);
}
public void testHashcodeAndEquals() {
EqualsHashCodeTestUtils.checkEqualsAndHashCode(createRandomInstance(),
instance -> new UpdateAllocationSettingsStep(instance.getKey(), instance.getNextStepKey(), instance.getInclude(),
instance.getExclude(), instance.getRequire()),
this::mutateInstance);
EqualsHashCodeTestUtils
.checkEqualsAndHashCode(
createRandomInstance(), instance -> new UpdateAllocationSettingsStep(instance.getKey(), instance.getNextStepKey(),
instance.getClient(), instance.getInclude(), instance.getExclude(), instance.getRequire()),
this::mutateInstance);
}
public void testPerformAction() {
IndexMetaData indexMetadata = IndexMetaData.builder(randomAlphaOfLength(5))
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0).build();
MetaData metaData = MetaData.builder()
.persistentSettings(settings(Version.CURRENT).build())
.put(IndexMetaData.builder(indexMetadata))
.build();
Index index = indexMetadata.getIndex();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
UpdateAllocationSettingsStep step = createRandomInstance();
ClusterState newState = step.performAction(index, clusterState);
assertNotSame(clusterState, newState);
assertThat(getRouting(index, newState, IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey()), equalTo(step.getInclude()));
assertThat(getRouting(index, newState, IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey()), equalTo(step.getExclude()));
assertThat(getRouting(index, newState, IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey()), equalTo(step.getRequire()));
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
@SuppressWarnings("unchecked")
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
Settings.Builder expectedSettings = Settings.builder();
step.getInclude().forEach(
(key, value) -> expectedSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + key, value));
step.getExclude().forEach(
(key, value) -> expectedSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + key, value));
step.getRequire().forEach(
(key, value) -> expectedSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + key, value));
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings.build(), index.getName());
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
return null;
}
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
SetOnce<Boolean> actionCompleted = new SetOnce<>();
step.performAction(index, new Listener() {
@Override
public void onResponse(boolean complete) {
actionCompleted.set(complete);
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
});
assertEquals(true, actionCompleted.get());
Mockito.verify(client, Mockito.only()).admin();
Mockito.verify(adminClient, Mockito.only()).indices();
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
}
public void testPerformActionNoIndex() {
MetaData metaData = MetaData.builder().persistentSettings(settings(Version.CURRENT).build()).build();
Index index = new Index("invalid_index", "invalid_index_id");
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
public void testPerformActionFailure() {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
Exception exception = new RuntimeException();
UpdateAllocationSettingsStep step = createRandomInstance();
ClusterState newState = step.performAction(index, clusterState);
assertSame(clusterState, newState);
}
public void testAddMissingAttr() {
String prefix = randomFrom(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey(),
IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey(),
IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey());
Map<String, String> newAttrs = Collections.singletonMap(randomAlphaOfLength(4), randomAlphaOfLength(5));
Settings existingSettings = Settings.builder()
.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "box_type", "foo")
.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + "box_type", "bar")
.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "box_type", "baz").build();
Settings.Builder newSettingsBuilder = Settings.builder();
UpdateAllocationSettingsStep.addMissingAttrs(newAttrs, prefix, existingSettings, newSettingsBuilder);
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
Settings.Builder expectedSettingsBuilder = Settings.builder();
newAttrs.forEach((k, v) -> expectedSettingsBuilder.put(prefix + k, v));
assertThat(newSettingsBuilder.build(), equalTo(expectedSettingsBuilder.build()));
}
Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
Mockito.doAnswer(new Answer<Void>() {
public void testAddMissingAttrDiffenerentValue() {
String prefix = randomFrom(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey(),
IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey(),
IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey());
String newKey = randomAlphaOfLength(4);
String newValue = randomAlphaOfLength(5);
Map<String, String> newAttrs = Collections.singletonMap(newKey, newValue);
Settings existingSettings = Settings.builder()
.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "box_type", "foo")
.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + "box_type", "bar")
.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "box_type", "baz")
.put(prefix + newKey, "1234").build();
Settings.Builder newSettingsBuilder = Settings.builder();
UpdateAllocationSettingsStep.addMissingAttrs(newAttrs, prefix, existingSettings, newSettingsBuilder);
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
@SuppressWarnings("unchecked")
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
Settings.Builder expectedSettings = Settings.builder();
step.getInclude().forEach(
(key, value) -> expectedSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + key, value));
step.getExclude().forEach(
(key, value) -> expectedSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + key, value));
step.getRequire().forEach(
(key, value) -> expectedSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + key, value));
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings.build(), index.getName());
listener.onFailure(exception);
return null;
}
Settings.Builder expectedSettingsBuilder = Settings.builder();
newAttrs.forEach((k, v) -> expectedSettingsBuilder.put(prefix + k, v));
assertThat(newSettingsBuilder.build(), equalTo(expectedSettingsBuilder.build()));
}
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
public void testAddMissingAttrNoneMissing() {
String prefix = randomFrom(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey(),
IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey(),
IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey());
String newKey = randomAlphaOfLength(4);
String newValue = randomAlphaOfLength(5);
Map<String, String> newAttrs = Collections.singletonMap(newKey, newValue);
Settings existingSettings = Settings.builder()
.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "box_type", "foo")
.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + "box_type", "bar")
.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "box_type", "baz")
.put(prefix + newKey, newValue).build();
Settings.Builder newSettingsBuilder = Settings.builder();
UpdateAllocationSettingsStep.addMissingAttrs(newAttrs, prefix, existingSettings, newSettingsBuilder);
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
step.performAction(index, new Listener() {
Settings.Builder expectedSettingsBuilder = Settings.builder();
assertThat(newSettingsBuilder.build(), equalTo(expectedSettingsBuilder.build()));
}
@Override
public void onResponse(boolean complete) {
throw new AssertionError("Unexpected method call");
}
private Map<String, String> getRouting(Index index, ClusterState clusterState, String settingPrefix) {
Settings includeSettings = clusterState.metaData().index(index).getSettings()
.getByPrefix(settingPrefix);
return includeSettings.keySet().stream().collect(Collectors.toMap(Function.identity(), includeSettings::get));
@Override
public void onFailure(Exception e) {
assertSame(exception, e);
exceptionThrown.set(true);
}
});
assertEquals(true, exceptionThrown.get());
Mockito.verify(client, Mockito.only()).admin();
Mockito.verify(adminClient, Mockito.only()).indices();
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
}
}

View File

@ -6,23 +6,40 @@
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsTestHelper;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep.Listener;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
import org.junit.Before;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class UpdateReplicaSettingsStepTests extends ESTestCase {
private Client client;
@Before
public void setup() {
client = Mockito.mock(Client.class);
}
public UpdateReplicaSettingsStep createRandomInstance() {
StepKey stepKey = new StepKey(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10));
StepKey nextStepKey = new StepKey(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10));
return new UpdateReplicaSettingsStep(stepKey, nextStepKey, randomIntBetween(0, 100));
return new UpdateReplicaSettingsStep(stepKey, nextStepKey, client, randomIntBetween(0, 100));
}
public UpdateReplicaSettingsStep mutateInstance(UpdateReplicaSettingsStep instance) {
@ -44,45 +61,107 @@ public class UpdateReplicaSettingsStepTests extends ESTestCase {
throw new AssertionError("Illegal randomisation branch");
}
return new UpdateReplicaSettingsStep(key, nextKey, replicas);
return new UpdateReplicaSettingsStep(key, nextKey, client, replicas);
}
public void testHashcodeAndEquals() {
EqualsHashCodeTestUtils.checkEqualsAndHashCode(createRandomInstance(),
instance -> new UpdateReplicaSettingsStep(instance.getKey(), instance.getNextStepKey(), instance.getNumberOfReplicas()),
this::mutateInstance);
EqualsHashCodeTestUtils.checkEqualsAndHashCode(createRandomInstance(), instance -> new UpdateReplicaSettingsStep(instance.getKey(),
instance.getNextStepKey(), instance.getClient(), instance.getNumberOfReplicas()), this::mutateInstance);
}
public void testPerformAction() {
IndexMetaData indexMetadata = IndexMetaData.builder(randomAlphaOfLength(5))
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0).build();
MetaData metaData = MetaData.builder()
.persistentSettings(settings(Version.CURRENT).build())
.put(IndexMetaData.builder(indexMetadata))
.build();
Index index = indexMetadata.getIndex();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
UpdateReplicaSettingsStep step = createRandomInstance();
ClusterState newState = step.performAction(index, clusterState);
assertNotSame(clusterState, newState);
IndexMetaData newIndexMetadata = newState.metaData().index(index);
assertNotNull(newIndexMetadata);
assertNotSame(indexMetadata, newIndexMetadata);
assertTrue(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.exists(newIndexMetadata.getSettings()));
assertEquals(step.getNumberOfReplicas(),
(int) IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(newIndexMetadata.getSettings()));
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
@SuppressWarnings("unchecked")
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
Settings expectedSettings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, step.getNumberOfReplicas())
.build();
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, index.getName());
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
return null;
}
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
SetOnce<Boolean> actionCompleted = new SetOnce<>();
step.performAction(index, new Listener() {
@Override
public void onResponse(boolean complete) {
actionCompleted.set(complete);
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
});
assertEquals(true, actionCompleted.get());
Mockito.verify(client, Mockito.only()).admin();
Mockito.verify(adminClient, Mockito.only()).indices();
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
}
public void testPerformActionNoIndex() {
MetaData metaData = MetaData.builder().persistentSettings(settings(Version.CURRENT).build()).build();
Index index = new Index("invalid_index", "invalid_index_id");
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
public void testPerformActionFailure() {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
Exception exception = new RuntimeException();
UpdateReplicaSettingsStep step = createRandomInstance();
ClusterState newState = step.performAction(index, clusterState);
assertSame(clusterState, newState);
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
@SuppressWarnings("unchecked")
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
Settings expectedSettings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, step.getNumberOfReplicas())
.build();
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, index.getName());
listener.onFailure(exception);
return null;
}
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
step.performAction(index, new Listener() {
@Override
public void onResponse(boolean complete) {
throw new AssertionError("Unexpected method call");
}
@Override
public void onFailure(Exception e) {
assertSame(exception, e);
exceptionThrown.set(true);
}
});
assertEquals(true, exceptionThrown.get());
Mockito.verify(client, Mockito.only()).admin();
Mockito.verify(adminClient, Mockito.only()).indices();
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
}
}