From 53c3d040f7ef19c42eb03c1cd713012a26c219dc Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Fri, 12 Jan 2018 13:48:09 +0000 Subject: [PATCH] Add ReplicaAction for Index Lifecycle (#3535) * Add ReplicaAction for Index Lifecycle * Add validation --- .../xpack/indexlifecycle/ReplicasAction.java | 32 +- .../indexlifecycle/ReplicasActionTests.java | 276 ++++++++++++++++++ 2 files changed, 306 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/indexlifecycle/ReplicasAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/indexlifecycle/ReplicasAction.java index 07112f28166..4d4c26b2efb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/indexlifecycle/ReplicasAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/indexlifecycle/ReplicasAction.java @@ -5,12 +5,18 @@ */ package org.elasticsearch.xpack.indexlifecycle; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -42,6 +48,9 @@ public class ReplicasAction implements LifecycleAction { } public ReplicasAction(int numberOfReplicas) { + if (numberOfReplicas < 0) { + throw new IllegalArgumentException("[" + NUMBER_OF_REPLICAS_FIELD.getPreferredName() + "] must be >= 0"); + } this.numberOfReplicas = numberOfReplicas; } @@ -69,8 +78,27 @@ public class ReplicasAction implements LifecycleAction { @Override public void execute(Index index, Client client, ClusterService clusterService, Listener listener) { - // NORELEASE: stub - listener.onSuccess(true); + IndexMetaData idxMeta = clusterService.state().metaData().getIndexSafe(index); + int currentNumberReplicas = idxMeta.getNumberOfReplicas(); + if (currentNumberReplicas == numberOfReplicas) { + boolean isAllocationCompleted = ActiveShardCount.ALL.enoughShardsActive(clusterService.state(), index.getName()); + listener.onSuccess(isAllocationCompleted); + } else { + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index.getName()) + .settings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)); + client.admin().indices().updateSettings(updateSettingsRequest, new ActionListener() { + + @Override + public void onResponse(UpdateSettingsResponse response) { + listener.onSuccess(false); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } } public int getNumberOfReplicas() { diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/ReplicasActionTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/ReplicasActionTests.java index 19b76bb8a20..6dfd2c592ed 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/ReplicasActionTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/ReplicasActionTests.java @@ -5,9 +5,34 @@ */ package org.elasticsearch.xpack.indexlifecycle; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsTestHelper; +import org.elasticsearch.client.AdminClient; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.indexlifecycle.LifecycleAction.Listener; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.io.IOException; @@ -27,4 +52,255 @@ public class ReplicasActionTests extends AbstractSerializingTestCase instanceReader() { return ReplicasAction::new; } + + @Override + protected ReplicasAction mutateInstance(ReplicasAction instance) throws IOException { + return new ReplicasAction(instance.getNumberOfReplicas() + randomIntBetween(1, 5)); + } + + public void testInvalidNumReplicas() { + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, + () -> new ReplicasAction(randomIntBetween(-1000, -1))); + assertEquals("[" + ReplicasAction.NUMBER_OF_REPLICAS_FIELD.getPreferredName() + "] must be >= 0", exception.getMessage()); + } + + public void testExecuteDifferentReplicaCount() { + int existingNumReplicas = randomIntBetween(0, 10); + int newNumReplicas = randomValueOtherThan(existingNumReplicas, () -> randomIntBetween(0, 10)); + + ReplicasAction action = new ReplicasAction(newNumReplicas); + + IndexMetaData indexMetadata = IndexMetaData.builder(randomAlphaOfLengthBetween(1, 20)) + .settings(Settings.builder().put("index.version.created", Version.CURRENT.id)).numberOfShards(randomIntBetween(1, 5)) + .numberOfReplicas(existingNumReplicas).build(); + Index index = indexMetadata.getIndex(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata); + ClusterState clusterstate = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build())) + .build(); + + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + ClusterService clusterService = Mockito.mock(ClusterService.class); + + Mockito.when(client.admin()).thenReturn(adminClient); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + Mockito.doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0]; + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + Settings expectedSettings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, newNumReplicas).build(); + UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, index.getName()); + listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true)); + return null; + } + + }).when(indicesClient).updateSettings(Mockito.any(), Mockito.any()); + Mockito.when(clusterService.state()).thenReturn(clusterstate); + + SetOnce actionCompleted = new SetOnce<>(); + action.execute(index, client, clusterService, new Listener() { + + @Override + public void onSuccess(boolean completed) { + actionCompleted.set(completed); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected method call", e); + } + }); + + assertEquals(false, actionCompleted.get()); + + Mockito.verify(client, Mockito.only()).admin(); + Mockito.verify(adminClient, Mockito.only()).indices(); + Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any()); + Mockito.verify(clusterService, Mockito.only()).state(); + } + + public void testExecuteUpdateReplicaCountFailure() { + int existingNumReplicas = randomIntBetween(0, 10); + int newNumReplicas = randomValueOtherThan(existingNumReplicas, () -> randomIntBetween(0, 10)); + + ReplicasAction action = new ReplicasAction(newNumReplicas); + Exception exception = new RuntimeException(); + + IndexMetaData indexMetadata = IndexMetaData.builder(randomAlphaOfLengthBetween(1, 20)) + .settings(Settings.builder().put("index.version.created", Version.CURRENT.id)).numberOfShards(randomIntBetween(1, 5)) + .numberOfReplicas(existingNumReplicas).build(); + Index index = indexMetadata.getIndex(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata); + ClusterState clusterstate = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build())) + .build(); + + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + ClusterService clusterService = Mockito.mock(ClusterService.class); + + Mockito.when(client.admin()).thenReturn(adminClient); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + Mockito.doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0]; + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + Settings expectedSettings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, newNumReplicas).build(); + UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, index.getName()); + listener.onFailure(exception); + return null; + } + + }).when(indicesClient).updateSettings(Mockito.any(), Mockito.any()); + Mockito.when(clusterService.state()).thenReturn(clusterstate); + + SetOnce exceptionThrown = new SetOnce<>(); + action.execute(index, client, clusterService, new Listener() { + + @Override + public void onSuccess(boolean completed) { + throw new AssertionError("Unexpected method call"); + } + + @Override + public void onFailure(Exception e) { + exceptionThrown.set(true); + } + }); + + assertEquals(true, exceptionThrown.get()); + + Mockito.verify(client, Mockito.only()).admin(); + Mockito.verify(adminClient, Mockito.only()).indices(); + Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any()); + Mockito.verify(clusterService, Mockito.only()).state(); + } + + public void testExecuteAllocationNotComplete() { + + ReplicasAction action = createTestInstance(); + + int numberOfShards = randomIntBetween(1, 5); + int numberOfReplicas = action.getNumberOfReplicas(); + IndexMetaData indexMetadata = IndexMetaData.builder(randomAlphaOfLengthBetween(1, 20)) + .settings(Settings.builder().put("index.version.created", Version.CURRENT.id)).numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas).build(); + Index index = indexMetadata.getIndex(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata); + IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index); + for (int shard = 0; shard < numberOfShards; shard++) { + for (int replica = 0; replica < numberOfReplicas + 1; replica++) { + ShardRoutingState state; + if (replica == 0) { + state = ShardRoutingState.STARTED; + } else if ((replica == numberOfReplicas) || randomBoolean()) { + state = randomFrom(ShardRoutingState.UNASSIGNED, ShardRoutingState.INITIALIZING); + } else { + state = ShardRoutingState.STARTED; + } + String nodeId = "node" + replica; + if (ShardRoutingState.UNASSIGNED.equals(state)) { + nodeId = null; + } + indexRoutingTable.addShard(TestShardRouting.newShardRouting(new ShardId(index, shard), nodeId, replica == 0, state)); + } + } + ClusterState clusterstate = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build())) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()) + .build(); + + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + ClusterService clusterService = Mockito.mock(ClusterService.class); + + Mockito.when(client.admin()).thenReturn(adminClient); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + Mockito.when(clusterService.state()).thenReturn(clusterstate); + + SetOnce actionCompleted = new SetOnce<>(); + action.execute(index, client, clusterService, new Listener() { + + @Override + public void onSuccess(boolean completed) { + actionCompleted.set(completed); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected method call", e); + } + }); + + assertEquals(false, actionCompleted.get()); + + Mockito.verify(clusterService, Mockito.times(2)).state(); + Mockito.verify(client, Mockito.never()).admin(); + Mockito.verify(adminClient, Mockito.never()).indices(); + Mockito.verify(indicesClient, Mockito.never()).updateSettings(Mockito.any(), Mockito.any()); + } + + public void testExecuteAllocationComplete() { + + ReplicasAction action = createTestInstance(); + + int numberOfShards = randomIntBetween(1, 5); + int numberOfReplicas = action.getNumberOfReplicas(); + IndexMetaData indexMetadata = IndexMetaData.builder(randomAlphaOfLengthBetween(1, 20)) + .settings(Settings.builder().put("index.version.created", Version.CURRENT.id)).numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas).build(); + Index index = indexMetadata.getIndex(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata); + IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index); + for (int shard = 0; shard < numberOfShards; shard++) { + for (int replica = 0; replica < numberOfReplicas + 1; replica++) { + ShardRoutingState state = ShardRoutingState.STARTED; + String nodeId = "node" + replica; + indexRoutingTable.addShard(TestShardRouting.newShardRouting(new ShardId(index, shard), nodeId, replica == 0, state)); + } + } + ClusterState clusterstate = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build())) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build(); + + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + ClusterService clusterService = Mockito.mock(ClusterService.class); + + Mockito.when(client.admin()).thenReturn(adminClient); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + Mockito.when(clusterService.state()).thenReturn(clusterstate); + + SetOnce actionCompleted = new SetOnce<>(); + action.execute(index, client, clusterService, new Listener() { + + @Override + public void onSuccess(boolean completed) { + actionCompleted.set(completed); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected method call", e); + } + }); + + assertEquals(true, actionCompleted.get()); + + Mockito.verify(clusterService, Mockito.times(2)).state(); + Mockito.verify(client, Mockito.never()).admin(); + Mockito.verify(adminClient, Mockito.never()).indices(); + Mockito.verify(indicesClient, Mockito.never()).updateSettings(Mockito.any(), Mockito.any()); + } }