Add the Shrink Action to Index Lifecycle Management (#3491)

This does the following in sequential service polls

1. sets the index to read-only and runs shrink with a modified `index.lifecycle.name` setting set to `null`.
2. checks to see if shrink is complete, if it is...
    b. set target index's `index.lifecycle.*` settings to the original index's values.
3. if not complete, just wait till next iteration
4. if operating on shrunken index, delete old index and add it as an alias to shrunken index
This commit is contained in:
Tal Levy 2018-01-17 11:22:06 -08:00 committed by GitHub
parent ff883d574b
commit 84fd234835
4 changed files with 669 additions and 22 deletions

View File

@ -5,38 +5,73 @@
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index;
import java.io.IOException;
import java.util.Objects;
/**
* A {@link LifecycleAction} which shrinks the index.
*/
public class ShrinkAction implements LifecycleAction {
public static final String NAME = "shrink";
public static final ParseField NUMBER_OF_SHARDS_FIELD = new ParseField("number_of_shards");
private static final ObjectParser<ShrinkAction, Void> PARSER = new ObjectParser<>(NAME, ShrinkAction::new);
private static final Logger logger = ESLoggerFactory.getLogger(ShrinkAction.class);
private static final String SHRUNK_INDEX_NAME_PREFIX = "shrunk-";
private static final ConstructingObjectParser<ShrinkAction, CreateIndexRequest> PARSER =
new ConstructingObjectParser<>(NAME, a -> new ShrinkAction((Integer) a[0]));
public static ShrinkAction parse(XContentParser parser) {
return PARSER.apply(parser, null);
static {
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_SHARDS_FIELD);
}
public ShrinkAction() {
private int numberOfShards;
public static ShrinkAction parse(XContentParser parser) throws IOException {
return PARSER.parse(parser, new CreateIndexRequest());
}
public ShrinkAction(int numberOfShards) {
if (numberOfShards <= 0) {
throw new IllegalArgumentException("[" + NUMBER_OF_SHARDS_FIELD.getPreferredName() + "] must be greater than 0");
}
this.numberOfShards = numberOfShards;
}
public ShrinkAction(StreamInput in) throws IOException {
this.numberOfShards = in.readVInt();
}
int getNumberOfShards() {
return numberOfShards;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(numberOfShards);
}
@Override
@ -47,35 +82,123 @@ public class ShrinkAction implements LifecycleAction {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(NUMBER_OF_SHARDS_FIELD.getPreferredName(), numberOfShards);
builder.endObject();
return builder;
}
/**
* Executes the Shrink Action.
*
* This function first checks whether the target shrunk index exists already, if it does not, then
* it will set the index to read-only and issue a resize request.
*
* Since the shrink response is not returned after a successful shrunk operation, we must poll to see if
* all the shards of the newly shrunk index are initialized. If so, then we can return the index to read-write
* and tell the listener that we have completed the action.
*
* @param index
* the {@link Index} on which to perform the action.
* @param client
* the {@link Client} to use for making changes to the index.
* @param clusterService
* the {@link ClusterService} to retrieve the current cluster state from.
* @param listener
* the {@link LifecycleAction.Listener} to return completion or failure responses to.
*/
@Override
public void execute(Index index, Client client, ClusterService clusterService, Listener listener) {
// NORELEASE: stub
listener.onSuccess(true);
String targetIndexName = SHRUNK_INDEX_NAME_PREFIX + index.getName();
ClusterState clusterState = clusterService.state();
IndexMetaData indexMetaData = clusterState.metaData().index(index.getName());
String sourceIndexName = IndexMetaData.INDEX_SHRINK_SOURCE_NAME.get(indexMetaData.getSettings());
boolean isShrunkIndex = index.getName().equals(SHRUNK_INDEX_NAME_PREFIX + sourceIndexName);
IndexMetaData shrunkIndexMetaData = clusterState.metaData().index(targetIndexName);
if (isShrunkIndex) {
// We are currently managing the shrunken index. This means all previous operations were successful and
// the original index is deleted. It is important to add an alias from the original index name to the shrunken
// index so that previous actions will still succeed.
boolean aliasAlreadyExists = indexMetaData.getAliases().values().contains(AliasMetaData.builder(sourceIndexName).build());
boolean sourceIndexDeleted = clusterState.metaData().hasIndex(sourceIndexName) == false;
if (sourceIndexDeleted && aliasAlreadyExists) {
listener.onSuccess(true);
} else {
IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest()
.addAliasAction(IndicesAliasesRequest.AliasActions.removeIndex().index(sourceIndexName))
.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(index.getName()).alias(sourceIndexName));
client.admin().indices().aliases(aliasesRequest, ActionListener.wrap(response -> {
listener.onSuccess(true);
}, listener::onFailure));
}
} else if (shrunkIndexMetaData == null) {
// Shrunken index is not present yet, it is time to issue to shrink request
ResizeRequest resizeRequest = new ResizeRequest(targetIndexName, index.getName());
resizeRequest.getTargetIndexRequest().settings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfShards)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, indexMetaData.getNumberOfReplicas())
.build());
indexMetaData.getAliases().values().spliterator().forEachRemaining(aliasMetaDataObjectCursor -> {
resizeRequest.getTargetIndexRequest().alias(new Alias(aliasMetaDataObjectCursor.value.alias()));
});
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(Settings.builder()
.put(IndexMetaData.SETTING_BLOCKS_WRITE, true).build(), index.getName());
client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(r -> {
client.admin().indices().resizeIndex(resizeRequest, ActionListener.wrap(
resizeResponse -> {
if (resizeResponse.isAcknowledged()) {
listener.onSuccess(false);
} else {
listener.onFailure(new IllegalStateException("Shrink request failed to be acknowledged"));
}
}, listener::onFailure));
}, listener::onFailure));
} else if (index.getName().equals(IndexMetaData.INDEX_SHRINK_SOURCE_NAME.get(shrunkIndexMetaData.getSettings())) == false) {
// The target shrunken index exists, but it was not shrunk from our managed index. This means
// some external actions were done to create this index, and so we cannot progress with the shrink
// action until this is resolved.
listener.onFailure(new IllegalStateException("Cannot shrink index [" + index.getName() + "] because target " +
"index [" + targetIndexName + "] already exists."));
} else if (ActiveShardCount.ALL.enoughShardsActive(clusterService.state(), targetIndexName)) {
if (indexMetaData.getSettings().get("index.lifecycle.name")
.equals(shrunkIndexMetaData.getSettings().get("index.lifecycle.name"))) {
// Since both the shrunken and original indices co-exist, do nothing and wait until
// the final step of the shrink action is completed and this original index is deleted.
listener.onSuccess(false);
} else {
// Since all shards of the shrunken index are active, it is safe to continue forward
// and begin swapping the indices by inheriting the lifecycle management to the new shrunken index.
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(Settings.builder()
.put("index.lifecycle.name", indexMetaData.getSettings().get("index.lifecycle.name"))
.put("index.lifecycle.phase", indexMetaData.getSettings().get("index.lifecycle.phase"))
.put("index.lifecycle.action", indexMetaData.getSettings().get("index.lifecycle.action")).build(), targetIndexName);
client.admin().indices().updateSettings(updateSettingsRequest,
ActionListener.wrap(r -> listener.onSuccess(false) , listener::onFailure));
}
} else {
// We are here because both the shrunken and original indices exist, but the shrunken index is not
// fully active yet. This means that we wait for another poll iteration of execute to check the
// state again.
logger.debug("index [" + index.getName() + "] has been shrunk to shrunken-index [" + targetIndexName + "], but" +
"shrunken index is not fully active yet");
listener.onSuccess(false);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShrinkAction that = (ShrinkAction) o;
return Objects.equals(numberOfShards, that.numberOfShards);
}
@Override
public int hashCode() {
return 1;
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != getClass()) {
return false;
}
return true;
return Objects.hash(numberOfShards);
}
@Override
public String toString() {
return Strings.toString(this);
}
}

View File

@ -149,6 +149,7 @@ public class IndexLifecycle extends Plugin {
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ForceMergeAction.NAME), ForceMergeAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReplicasAction.NAME), ReplicasAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse));
}

View File

@ -5,11 +5,49 @@
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsTestHelper;
import org.elasticsearch.action.admin.indices.shrink.ResizeAction;
import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
import org.elasticsearch.action.admin.indices.shrink.ResizeResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.Collections;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
public class ShrinkActionTests extends AbstractSerializingTestCase<ShrinkAction> {
@ -20,11 +58,495 @@ public class ShrinkActionTests extends AbstractSerializingTestCase<ShrinkAction>
@Override
protected ShrinkAction createTestInstance() {
return new ShrinkAction();
return new ShrinkAction(randomIntBetween(1, 100));
}
@Override
protected ShrinkAction mutateInstance(ShrinkAction action) {
return new ShrinkAction(action.getNumberOfShards() + randomIntBetween(1, 2));
}
@Override
protected Reader<ShrinkAction> instanceReader() {
return ShrinkAction::new;
}
public void testNonPositiveShardNumber() {
Exception e = expectThrows(Exception.class, () -> new ShrinkAction(randomIntBetween(-100, 0)));
assertThat(e.getMessage(), equalTo("[number_of_shards] must be greater than 0"));
}
public void testExecuteSuccessfullyCompleted() {
String originalIndexName = randomAlphaOfLengthBetween(1, 20);
Index index = new Index("shrunk-" + originalIndexName, randomAlphaOfLengthBetween(1, 20));
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexMetaData originalIndexMetaData = IndexMetaData.builder(originalIndexName)
.settings(settings(Version.CURRENT)).numberOfReplicas(0).numberOfShards(1).build();
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
.settings(settings(Version.CURRENT).put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME_KEY, originalIndexName))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetadata).fPut(originalIndexName, originalIndexMetaData);
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.build();
Mockito.when(clusterService.state()).thenReturn(clusterState);
Client client = Mockito.mock(Client.class);
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
Mockito.doAnswer(invocationOnMock -> {
IndicesAliasesRequest request = (IndicesAliasesRequest) invocationOnMock.getArguments()[0];
@SuppressWarnings("unchecked")
ActionListener<IndicesAliasesResponse> listener = (ActionListener<IndicesAliasesResponse>) invocationOnMock.getArguments()[1];
IndicesAliasesResponse response = IndicesAliasesAction.INSTANCE.newResponse();
response.readFrom(StreamInput.wrap(new byte[] { 1 }));
assertThat(request.getAliasActions().size(), equalTo(2));
assertThat(request.getAliasActions().get(0).actionType(), equalTo(IndicesAliasesRequest.AliasActions.Type.REMOVE_INDEX));
assertThat(request.getAliasActions().get(0).indices(), equalTo(new String[] { originalIndexName }));
assertThat(request.getAliasActions().get(1).actionType(), equalTo(IndicesAliasesRequest.AliasActions.Type.ADD));
assertThat(request.getAliasActions().get(1).indices(), equalTo(new String[] { index.getName() }));
assertThat(request.getAliasActions().get(1).aliases(), equalTo(new String[] { originalIndexName }));
listener.onResponse(response);
return null;
}).when(indicesClient).aliases(any(), any());
SetOnce<Boolean> actionCompleted = new SetOnce<>();
ShrinkAction action = new ShrinkAction(randomIntBetween(1, 10));
action.execute(index, client, clusterService, new LifecycleAction.Listener() {
@Override
public void onSuccess(boolean completed) {
actionCompleted.set(completed);
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
});
assertTrue(actionCompleted.get());
Mockito.verify(client, Mockito.only()).admin();
Mockito.verify(adminClient, Mockito.only()).indices();
Mockito.verify(indicesClient, Mockito.only()).aliases(any(), any());
}
public void testExecuteAlreadyCompletedAndRunAgain() {
String originalIndexName = randomAlphaOfLengthBetween(1, 20);
Index index = new Index("shrunk-" + originalIndexName, randomAlphaOfLengthBetween(1, 20));
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
.putAlias(AliasMetaData.builder(originalIndexName).build())
.settings(settings(Version.CURRENT).put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME_KEY, originalIndexName))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetadata);
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.build();
Mockito.when(clusterService.state()).thenReturn(clusterState);
Client client = Mockito.mock(Client.class);
SetOnce<Boolean> actionCompleted = new SetOnce<>();
ShrinkAction action = new ShrinkAction(randomIntBetween(1, 10));
action.execute(index, client, clusterService, new LifecycleAction.Listener() {
@Override
public void onSuccess(boolean completed) {
actionCompleted.set(completed);
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
});
assertTrue(actionCompleted.get());
Mockito.verify(client, Mockito.never()).admin();
}
public void testExecuteOriginalIndexAliasFailure() {
String originalIndexName = randomAlphaOfLengthBetween(1, 20);
Index index = new Index("shrunk-" + originalIndexName, randomAlphaOfLengthBetween(1, 20));
Index targetIndex = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
.settings(settings(Version.CURRENT).put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME_KEY, originalIndexName))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetadata);
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.routingTable(RoutingTable.builder()
.add(IndexRoutingTable.builder(targetIndex).addShard(
TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true,
ShardRoutingState.STARTED)))
.build())
.build();
Mockito.when(clusterService.state()).thenReturn(clusterState);
Client client = Mockito.mock(Client.class);
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
Mockito.doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<IndicesAliasesResponse> listener = (ActionListener<IndicesAliasesResponse>) invocationOnMock.getArguments()[1];
listener.onFailure(new RuntimeException("failed"));
return null;
}).when(indicesClient).aliases(any(), any());
SetOnce<Exception> onFailureException = new SetOnce<>();
ShrinkAction action = new ShrinkAction(randomIntBetween(1, 10));
action.execute(index, client, clusterService, new LifecycleAction.Listener() {
@Override
public void onSuccess(boolean completed) {
throw new AssertionError("Unexpected method call");
}
@Override
public void onFailure(Exception e) {
onFailureException.set(e);
}
});
assertThat(onFailureException.get().getMessage(), equalTo("failed"));
Mockito.verify(client, Mockito.only()).admin();
Mockito.verify(adminClient, Mockito.only()).indices();
Mockito.verify(indicesClient, Mockito.only()).aliases(any(), any());
}
public void testExecuteWithIssuedResizeRequest() {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
Index targetIndex = new Index("shrunk-" + index.getName(), randomAlphaOfLengthBetween(1, 20));
int numberOfShards = randomIntBetween(1, 5);
int numberOfReplicas = randomIntBetween(1, 5);
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
.settings(settings(Version.CURRENT))
.putAlias(AliasMetaData.builder("my_alias"))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(numberOfReplicas).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetadata);
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.routingTable(RoutingTable.builder()
.add(IndexRoutingTable.builder(targetIndex).addShard(
TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true,
ShardRoutingState.STARTED)))
.build())
.build();
Mockito.when(clusterService.state()).thenReturn(clusterState);
Client client = Mockito.mock(Client.class);
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
Settings expectedSettings = Settings.builder().put(IndexMetaData.SETTING_BLOCKS_WRITE, true).build();
Mockito.doAnswer(invocationOnMock -> {
UpdateSettingsRequest request = (UpdateSettingsRequest) invocationOnMock.getArguments()[0];
@SuppressWarnings("unchecked")
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocationOnMock.getArguments()[1];
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, index.getName());
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
return null;
}).when(indicesClient).updateSettings(any(), any());
Mockito.doAnswer(invocationOnMock -> {
ResizeRequest request = (ResizeRequest) invocationOnMock.getArguments()[0];
@SuppressWarnings("unchecked")
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
assertThat(request.getSourceIndex(), equalTo(index.getName()));
assertThat(request.getTargetIndexRequest().aliases(), equalTo(Collections.singleton(new Alias("my_alias"))));
assertThat(request.getTargetIndexRequest().settings(), equalTo(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfShards)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build()));
assertThat(request.getTargetIndexRequest().index(), equalTo(targetIndex.getName()));
ResizeResponse resizeResponse = ResizeAction.INSTANCE.newResponse();
resizeResponse.readFrom(StreamInput.wrap(new byte[] { 1, 1, 1, 1, 1 }));
listener.onResponse(resizeResponse);
return null;
}).when(indicesClient).resizeIndex(any(), any());
SetOnce<Boolean> actionCompleted = new SetOnce<>();
ShrinkAction action = new ShrinkAction(numberOfShards);
action.execute(index, client, clusterService, new LifecycleAction.Listener() {
@Override
public void onSuccess(boolean completed) {
actionCompleted.set(completed);
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
});
assertFalse(actionCompleted.get());
Mockito.verify(client, Mockito.atLeast(1)).admin();
Mockito.verify(adminClient, Mockito.atLeast(1)).indices();
Mockito.verify(indicesClient, Mockito.atLeast(1)).updateSettings(any(), any());
Mockito.verify(indicesClient, Mockito.atLeast(1)).resizeIndex(any(), any());
}
public void testExecuteWithIssuedResizeRequestFailure() {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
Index targetIndex = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
.settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetadata);
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.routingTable(RoutingTable.builder()
.add(IndexRoutingTable.builder(targetIndex).addShard(
TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true,
ShardRoutingState.STARTED)))
.build())
.build();
Mockito.when(clusterService.state()).thenReturn(clusterState);
Client client = Mockito.mock(Client.class);
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
Mockito.doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
return null;
}).when(indicesClient).updateSettings(any(), any());
Mockito.doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
ResizeResponse resizeResponse = ResizeAction.INSTANCE.newResponse();
resizeResponse.readFrom(StreamInput.wrap(new byte[] { 0, 1, 1, 1, 1 }));
listener.onResponse(resizeResponse);
return null;
}).when(indicesClient).resizeIndex(any(), any());
SetOnce<Exception> exceptionReturned = new SetOnce<>();
ShrinkAction action = new ShrinkAction(randomIntBetween(1, 10));
action.execute(index, client, clusterService, new LifecycleAction.Listener() {
@Override
public void onSuccess(boolean completed) {
throw new AssertionError("Unexpected method call to onSuccess");
}
@Override
public void onFailure(Exception e) {
exceptionReturned.set(e);
}
});
assertThat(exceptionReturned.get().getMessage(), equalTo("Shrink request failed to be acknowledged"));
Mockito.verify(client, Mockito.atLeast(1)).admin();
Mockito.verify(adminClient, Mockito.atLeast(1)).indices();
Mockito.verify(indicesClient, Mockito.atLeast(1)).updateSettings(any(), any());
Mockito.verify(indicesClient, Mockito.atLeast(1)).resizeIndex(any(), any());
}
public void testExecuteWithAllShardsAllocatedAndShrunkenIndexSetting() {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
Index targetIndex = new Index("shrunk-" + index.getName(), randomAlphaOfLengthBetween(1, 20));
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
.settings(settings(Version.CURRENT).put("index.lifecycle.phase", "phase1")
.put("index.lifecycle.action", "action1").put("index.lifecycle.name", "test"))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
IndexMetaData targetIndexMetaData = IndexMetaData.builder(targetIndex.getName())
.settings(settings(Version.CURRENT).put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME_KEY, index.getName()))
.numberOfShards(randomIntBetween(1, 3)).numberOfReplicas(indexMetadata.getNumberOfReplicas()).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder()
.fPut(index.getName(), indexMetadata).fPut(targetIndex.getName(), targetIndexMetaData);
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.routingTable(RoutingTable.builder()
.add(IndexRoutingTable.builder(targetIndex).addShard(
TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true,
ShardRoutingState.STARTED)))
.build())
.build();
Mockito.when(clusterService.state()).thenReturn(clusterState);
Client client = Mockito.mock(Client.class);
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
Settings expectedSettings = Settings.builder().put("index.lifecycle.name", "test")
.put("index.lifecycle.phase", "phase1").put("index.lifecycle.action", "action1").build();
Mockito.doAnswer(invocationOnMock -> {
UpdateSettingsRequest request = (UpdateSettingsRequest) invocationOnMock.getArguments()[0];
@SuppressWarnings("unchecked")
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocationOnMock.getArguments()[1];
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, targetIndex.getName());
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
return null;
}).when(indicesClient).updateSettings(any(), any());
Mockito.doAnswer(invocationOnMock -> {
DeleteIndexRequest request = (DeleteIndexRequest) invocationOnMock.getArguments()[0];
@SuppressWarnings("unchecked")
ActionListener<DeleteIndexResponse> listener = (ActionListener<DeleteIndexResponse>) invocationOnMock.getArguments()[1];
assertNotNull(request);
assertEquals(1, request.indices().length);
assertEquals(index.getName(), request.indices()[0]);
listener.onResponse(null);
return null;
}).when(indicesClient).delete(any(), any());
SetOnce<Boolean> actionCompleted = new SetOnce<>();
ShrinkAction action = new ShrinkAction(randomIntBetween(1, 10));
action.execute(index, client, clusterService, new LifecycleAction.Listener() {
@Override
public void onSuccess(boolean completed) {
actionCompleted.set(completed);
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
});
assertFalse(actionCompleted.get());
Mockito.verify(client, Mockito.atLeast(1)).admin();
Mockito.verify(adminClient, Mockito.atLeast(1)).indices();
Mockito.verify(indicesClient, Mockito.atLeast(1)).updateSettings(any(), any());
}
public void testExecuteWithAllShardsAllocatedAndShrunkenIndexConfigured() {
String lifecycleName = randomAlphaOfLengthBetween(5, 10);
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
Index targetIndex = new Index("shrunk-" + index.getName(), randomAlphaOfLengthBetween(1, 20));
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
.settings(settings(Version.CURRENT).put("index.lifecycle.phase", "phase1")
.put("index.lifecycle.action", "action1").put("index.lifecycle.name", lifecycleName))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
IndexMetaData targetIndexMetaData = IndexMetaData.builder(targetIndex.getName())
.settings(settings(Version.CURRENT)
.put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME_KEY, index.getName())
.put(IndexLifecycle.LIFECYCLE_NAME_SETTING.getKey(), lifecycleName))
.numberOfShards(randomIntBetween(1, 3)).numberOfReplicas(indexMetadata.getNumberOfReplicas()).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder()
.fPut(index.getName(), indexMetadata).fPut(targetIndex.getName(), targetIndexMetaData);
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.routingTable(RoutingTable.builder()
.add(IndexRoutingTable.builder(targetIndex).addShard(
TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true,
ShardRoutingState.STARTED)))
.build())
.build();
Mockito.when(clusterService.state()).thenReturn(clusterState);
Client client = Mockito.mock(Client.class);
SetOnce<Boolean> actionCompleted = new SetOnce<>();
ShrinkAction action = new ShrinkAction(randomIntBetween(1, 10));
action.execute(index, client, clusterService, new LifecycleAction.Listener() {
@Override
public void onSuccess(boolean completed) {
actionCompleted.set(completed);
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
});
assertFalse(actionCompleted.get());
Mockito.verifyZeroInteractions(client);
}
public void testExecuteWaitingOnAllShardsActive() {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
Index targetIndex = new Index("shrunk-" + index.getName(), randomAlphaOfLengthBetween(1, 20));
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
.settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
IndexMetaData targetIndexMetadata = IndexMetaData.builder(targetIndex.getName())
.settings(settings(Version.CURRENT).put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME_KEY, index.getName()))
.numberOfShards(1).numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetadata).fPut(targetIndex.getName(), targetIndexMetadata);
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.routingTable(RoutingTable.builder()
.add(IndexRoutingTable.builder(targetIndex).addShard(
TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true,
ShardRoutingState.INITIALIZING)))
.build())
.build();
Mockito.when(clusterService.state()).thenReturn(clusterState);
Client client = Mockito.mock(Client.class);
SetOnce<Boolean> actionCompleted = new SetOnce<>();
ShrinkAction action = new ShrinkAction(randomIntBetween(1, 10));
action.execute(index, client, clusterService, new LifecycleAction.Listener() {
@Override
public void onSuccess(boolean completed) {
actionCompleted.set(completed);
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
});
assertFalse(actionCompleted.get());
}
public void testExecuteIndexAlreadyExists() {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
Index targetIndex = new Index("shrunk-" + index.getName(), randomAlphaOfLengthBetween(1, 20));
ClusterService clusterService = Mockito.mock(ClusterService.class);
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
.settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
IndexMetaData targetIndexMetadata = IndexMetaData.builder(targetIndex.getName())
.settings(settings(Version.CURRENT))
.numberOfShards(1).numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetadata).fPut(targetIndex.getName(), targetIndexMetadata);
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
.metaData(MetaData.builder().indices(indices.build())).build();
Mockito.when(clusterService.state()).thenReturn(clusterState);
Client client = Mockito.mock(Client.class);
SetOnce<Exception> actionFailed = new SetOnce<>();
ShrinkAction action = new ShrinkAction(randomIntBetween(1, 10));
action.execute(index, client, clusterService, new LifecycleAction.Listener() {
@Override
public void onSuccess(boolean completed) {
throw new AssertionError("Unexpected method call to onSuccess");
}
@Override
public void onFailure(Exception e) {
actionFailed.set(e);
}
});
assertThat(actionFailed.get().getMessage(), equalTo("Cannot shrink index [" + index.getName() + "]" +
" because target index [" + targetIndex.getName() + "] already exists."));
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
@ -33,7 +34,7 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
private static final ForceMergeAction TEST_FORCE_MERGE_ACTION = new ForceMergeAction(1);
private static final ReplicasAction TEST_REPLICAS_ACTION = new ReplicasAction(1);
private static final RolloverAction TEST_ROLLOVER_ACTION = new RolloverAction("", new ByteSizeValue(1), null, null);
private static final ShrinkAction TEST_SHRINK_ACTION = new ShrinkAction();
private static final ShrinkAction TEST_SHRINK_ACTION = new ShrinkAction(1);
public void testGetFirstPhase() {
Map<String, Phase> phases = new HashMap<>();