diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java index b632b8bcb7b..fdb27a503b1 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.indices.rollover; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.ParseField; @@ -170,6 +171,13 @@ public class RolloverRequest extends AcknowledgedRequest implem this.dryRun = dryRun; } + /** + * Sets the wait for active shards configuration for the rolled index that gets created. + */ + public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) { + createIndexRequest.waitForActiveShards(waitForActiveShards); + } + /** * Adds condition to check if the index is at least age old */ diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverAction.java index ce17efc1e31..bf1cd9e2b9f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverAction.java @@ -139,17 +139,19 @@ public class RolloverAction implements LifecycleAction { StepKey waitForRolloverReadyStepKey = new StepKey(phase, NAME, WaitForRolloverReadyStep.NAME); StepKey rolloverStepKey = new StepKey(phase, NAME, RolloverStep.NAME); + StepKey waitForActiveShardsKey = new StepKey(phase, NAME, WaitForActiveShardsStep.NAME); StepKey updateDateStepKey = new StepKey(phase, NAME, UpdateRolloverLifecycleDateStep.NAME); StepKey setIndexingCompleteStepKey = new StepKey(phase, NAME, INDEXING_COMPLETE_STEP_NAME); WaitForRolloverReadyStep waitForRolloverReadyStep = new WaitForRolloverReadyStep(waitForRolloverReadyStepKey, rolloverStepKey, client, maxSize, maxAge, maxDocs); - RolloverStep rolloverStep = new RolloverStep(rolloverStepKey, updateDateStepKey, client); + RolloverStep rolloverStep = new RolloverStep(rolloverStepKey, waitForActiveShardsKey, client); + WaitForActiveShardsStep waitForActiveShardsStep = new WaitForActiveShardsStep(waitForActiveShardsKey, updateDateStepKey); UpdateRolloverLifecycleDateStep updateDateStep = new UpdateRolloverLifecycleDateStep(updateDateStepKey, setIndexingCompleteStepKey, System::currentTimeMillis); UpdateSettingsStep setIndexingCompleteStep = new UpdateSettingsStep(setIndexingCompleteStepKey, nextStepKey, client, indexingComplete); - return Arrays.asList(waitForRolloverReadyStep, rolloverStep, updateDateStep, setIndexingCompleteStep); + return Arrays.asList(waitForRolloverReadyStep, rolloverStep, waitForActiveShardsStep, updateDateStep, setIndexingCompleteStep); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java index 542ac156a98..d8c11088f5e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; @@ -70,6 +71,9 @@ public class RolloverStep extends AsyncActionStep { // Calling rollover with no conditions will always roll over the index RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null); + // We don't wait for active shards when we perform the rollover because the + // {@link org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep} step will do so + rolloverRequest.setWaitForActiveShards(ActiveShardCount.NONE); getClient().admin().indices().rolloverIndex(rolloverRequest, ActionListener.wrap(response -> { assert response.isRolledOver() : "the only way this rollover call should fail is with an exception"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsStep.java new file mode 100644 index 00000000000..ff480aeea9e --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsStep.java @@ -0,0 +1,232 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ilm; + +import com.carrotsearch.hppc.cursors.IntObjectCursor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasOrIndex; +import org.elasticsearch.cluster.metadata.AliasOrIndex.Alias; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Objects; + +/** + * After we performed the index rollover we wait for the the configured number of shards for the rolled over index (ie. newly created + * index) to become available. + */ +public class WaitForActiveShardsStep extends ClusterStateWaitStep { + + public static final String NAME = "wait-for-active-shards"; + + private static final Logger logger = LogManager.getLogger(WaitForActiveShardsStep.class); + + WaitForActiveShardsStep(StepKey key, StepKey nextStepKey) { + super(key, nextStepKey); + } + + @Override + public boolean isRetryable() { + return true; + } + + @Override + public Result isConditionMet(Index index, ClusterState clusterState) { + IndexMetaData originalIndexMeta = clusterState.metaData().index(index); + + if (originalIndexMeta == null) { + String errorMessage = String.format(Locale.ROOT, "[%s] lifecycle action for index [%s] executed but index no longer exists", + getKey().getAction(), index.getName()); + // Index must have been since deleted + logger.debug(errorMessage); + return new Result(false, new Info(errorMessage)); + } + + boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(originalIndexMeta.getSettings()); + if (indexingComplete) { + String message = String.format(Locale.ROOT, "index [%s] has lifecycle complete set, skipping [%s]", + originalIndexMeta.getIndex().getName(), WaitForActiveShardsStep.NAME); + logger.trace(message); + return new Result(true, new Info(message)); + } + + String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(originalIndexMeta.getSettings()); + if (Strings.isNullOrEmpty(rolloverAlias)) { + throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS + + "] is not set on index [" + originalIndexMeta.getIndex().getName() + "]"); + } + + AliasOrIndex aliasOrIndex = clusterState.metaData().getAliasAndIndexLookup().get(rolloverAlias); + assert aliasOrIndex.isAlias() : rolloverAlias + " must be an alias but it is an index"; + + Alias alias = (Alias) aliasOrIndex; + IndexMetaData aliasWriteIndex = alias.getWriteIndex(); + final String rolledIndexName; + final String waitForActiveShardsSettingValue; + if (aliasWriteIndex != null) { + rolledIndexName = aliasWriteIndex.getIndex().getName(); + waitForActiveShardsSettingValue = aliasWriteIndex.getSettings().get("index.write.wait_for_active_shards"); + } else { + List indices = alias.getIndices(); + int maxIndexCounter = -1; + IndexMetaData rolledIndexMeta = null; + for (IndexMetaData indexMetaData : indices) { + int indexNameCounter = parseIndexNameCounter(indexMetaData.getIndex().getName()); + if (maxIndexCounter < indexNameCounter) { + maxIndexCounter = indexNameCounter; + rolledIndexMeta = indexMetaData; + } + } + if (rolledIndexMeta == null) { + String errorMessage = String.format(Locale.ROOT, + "unable to find the index that was rolled over from [%s] as part of lifecycle action [%s]", index.getName(), + getKey().getAction()); + + // Index must have been since deleted + logger.debug(errorMessage); + return new Result(false, new Info(errorMessage)); + } + rolledIndexName = rolledIndexMeta.getIndex().getName(); + waitForActiveShardsSettingValue = rolledIndexMeta.getSettings().get("index.write.wait_for_active_shards"); + } + + ActiveShardCount activeShardCount = ActiveShardCount.parseString(waitForActiveShardsSettingValue); + boolean enoughShardsActive = activeShardCount.enoughShardsActive(clusterState, rolledIndexName); + + IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(rolledIndexName); + int currentActiveShards = 0; + for (final IntObjectCursor shardRouting : indexRoutingTable.getShards()) { + currentActiveShards += shardRouting.value.activeShards().size(); + } + return new Result(enoughShardsActive, new ActiveShardsInfo(currentActiveShards, activeShardCount.toString(), enoughShardsActive)); + } + + /** + * Parses the number from the rolled over index name. It also supports the date-math format (ie. index name is wrapped in < and >) + *

+ * Eg. + *

+ * - For "logs-000002" it'll return 2 + * - For "<logs-{now/d}-3>" it'll return 3 + */ + static int parseIndexNameCounter(String indexName) { + int numberIndex = indexName.lastIndexOf("-"); + if (numberIndex == -1) { + throw new IllegalArgumentException("no - separator found in index name [" + indexName + "]"); + } + try { + return Integer.parseInt(indexName.substring(numberIndex + 1, indexName.endsWith(">") ? indexName.length() - 1 : + indexName.length())); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("unable to parse the index name [" + indexName + "] to extract the counter", e); + } + } + + static final class ActiveShardsInfo implements ToXContentObject { + + private final long currentActiveShardsCount; + private final String targetActiveShardsCount; + private final boolean enoughShardsActive; + private final String message; + + static final ParseField CURRENT_ACTIVE_SHARDS_COUNT = new ParseField("current_active_shards_count"); + static final ParseField TARGET_ACTIVE_SHARDS_COUNT = new ParseField("target_active_shards_count"); + static final ParseField ENOUGH_SHARDS_ACTIVE = new ParseField("enough_shards_active"); + static final ParseField MESSAGE = new ParseField("message"); + + ActiveShardsInfo(long currentActiveShardsCount, String targetActiveShardsCount, boolean enoughShardsActive) { + this.currentActiveShardsCount = currentActiveShardsCount; + this.targetActiveShardsCount = targetActiveShardsCount; + this.enoughShardsActive = enoughShardsActive; + + if (enoughShardsActive) { + message = "the target of [" + targetActiveShardsCount + "] are active. Don't need to wait anymore"; + } else { + message = "waiting for [" + targetActiveShardsCount + "] shards to become active, but only [" + currentActiveShardsCount + + "] are active"; + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(MESSAGE.getPreferredName(), message); + builder.field(CURRENT_ACTIVE_SHARDS_COUNT.getPreferredName(), currentActiveShardsCount); + builder.field(TARGET_ACTIVE_SHARDS_COUNT.getPreferredName(), targetActiveShardsCount); + builder.field(ENOUGH_SHARDS_ACTIVE.getPreferredName(), enoughShardsActive); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ActiveShardsInfo info = (ActiveShardsInfo) o; + return currentActiveShardsCount == info.currentActiveShardsCount && + enoughShardsActive == info.enoughShardsActive && + Objects.equals(targetActiveShardsCount, info.targetActiveShardsCount) && + Objects.equals(message, info.message); + } + + @Override + public int hashCode() { + return Objects.hash(currentActiveShardsCount, targetActiveShardsCount, enoughShardsActive, message); + } + } + + static final class Info implements ToXContentObject { + + private final String message; + + static final ParseField MESSAGE = new ParseField("message"); + + Info(String message) { + this.message = message; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(MESSAGE.getPreferredName(), message); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Info info = (Info) o; + return Objects.equals(message, info.message); + } + + @Override + public int hashCode() { + return Objects.hash(message); + } + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RolloverActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RolloverActionTests.java index 8feee4d1dd7..8e8b73dabe4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RolloverActionTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RolloverActionTests.java @@ -77,28 +77,32 @@ public class RolloverActionTests extends AbstractActionTestCase RolloverAction action = createTestInstance(); String phase = randomAlphaOfLengthBetween(1, 10); StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), - randomAlphaOfLengthBetween(1, 10)); + randomAlphaOfLengthBetween(1, 10)); List steps = action.toSteps(null, phase, nextStepKey); assertNotNull(steps); - assertEquals(4, steps.size()); + assertEquals(5, steps.size()); StepKey expectedFirstStepKey = new StepKey(phase, RolloverAction.NAME, WaitForRolloverReadyStep.NAME); StepKey expectedSecondStepKey = new StepKey(phase, RolloverAction.NAME, RolloverStep.NAME); - StepKey expectedThirdStepKey = new StepKey(phase, RolloverAction.NAME, UpdateRolloverLifecycleDateStep.NAME); - StepKey expectedFourthStepKey = new StepKey(phase, RolloverAction.NAME, RolloverAction.INDEXING_COMPLETE_STEP_NAME); + StepKey expectedThirdStepKey = new StepKey(phase, RolloverAction.NAME, WaitForActiveShardsStep.NAME); + StepKey expectedFourthStepKey = new StepKey(phase, RolloverAction.NAME, UpdateRolloverLifecycleDateStep.NAME); + StepKey expectedFifthStepKey = new StepKey(phase, RolloverAction.NAME, RolloverAction.INDEXING_COMPLETE_STEP_NAME); WaitForRolloverReadyStep firstStep = (WaitForRolloverReadyStep) steps.get(0); RolloverStep secondStep = (RolloverStep) steps.get(1); - UpdateRolloverLifecycleDateStep thirdStep = (UpdateRolloverLifecycleDateStep) steps.get(2); - UpdateSettingsStep fourthStep = (UpdateSettingsStep) steps.get(3); + WaitForActiveShardsStep thirdStep = (WaitForActiveShardsStep) steps.get(2); + UpdateRolloverLifecycleDateStep fourthStep = (UpdateRolloverLifecycleDateStep) steps.get(3); + UpdateSettingsStep fifthStep = (UpdateSettingsStep) steps.get(4); assertEquals(expectedFirstStepKey, firstStep.getKey()); assertEquals(expectedSecondStepKey, secondStep.getKey()); assertEquals(expectedThirdStepKey, thirdStep.getKey()); assertEquals(expectedFourthStepKey, fourthStep.getKey()); + assertEquals(expectedFifthStepKey, fifthStep.getKey()); assertEquals(secondStep.getKey(), firstStep.getNextStepKey()); assertEquals(thirdStep.getKey(), secondStep.getNextStepKey()); assertEquals(fourthStep.getKey(), thirdStep.getNextStepKey()); + assertEquals(fifthStep.getKey(), fourthStep.getNextStepKey()); assertEquals(action.getMaxSize(), firstStep.getMaxSize()); assertEquals(action.getMaxAge(), firstStep.getMaxAge()); assertEquals(action.getMaxDocs(), firstStep.getMaxDocs()); - assertEquals(nextStepKey, fourthStep.getNextStepKey()); + assertEquals(nextStepKey, fifthStep.getNextStepKey()); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsTests.java new file mode 100644 index 00000000000..615327ea4d7 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsTests.java @@ -0,0 +1,240 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ilm; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.Index; +import org.elasticsearch.xpack.core.ilm.Step.StepKey; + +import java.io.IOException; +import java.util.UUID; + +import static org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep.parseIndexNameCounter; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; + +public class WaitForActiveShardsTests extends AbstractStepTestCase { + + @Override + public WaitForActiveShardsStep createRandomInstance() { + StepKey stepKey = randomStepKey(); + StepKey nextStepKey = randomStepKey(); + + return new WaitForActiveShardsStep(stepKey, nextStepKey); + } + + @Override + public WaitForActiveShardsStep mutateInstance(WaitForActiveShardsStep instance) { + StepKey key = instance.getKey(); + StepKey nextKey = instance.getNextStepKey(); + + switch (between(0, 1)) { + case 0: + key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + break; + case 1: + nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + break; + default: + throw new AssertionError("Illegal randomisation branch"); + } + + return new WaitForActiveShardsStep(key, nextKey); + } + + @Override + public WaitForActiveShardsStep copyInstance(WaitForActiveShardsStep instance) { + return new WaitForActiveShardsStep(instance.getKey(), instance.getNextStepKey()); + } + + public void testIsConditionMetThrowsExceptionWhenRolloverAliasIsNotSet() { + String alias = randomAlphaOfLength(5); + IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10)) + .putAlias(AliasMetaData.builder(alias)) + .settings(settings(Version.CURRENT)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metaData(MetaData.builder().put(indexMetaData, true).build()) + .build(); + + try { + createRandomInstance().isConditionMet(indexMetaData.getIndex(), clusterState); + fail("expected the invocation to fail"); + } catch (IllegalStateException e) { + assertThat(e.getMessage(), is("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS + + "] is not set on index [" + indexMetaData.getIndex().getName() + "]")); + } + } + + public void testResultEvaluatedOnWriteIndexAliasWhenExists() { + String alias = randomAlphaOfLength(5); + IndexMetaData originalIndex = IndexMetaData.builder("index-000000") + .putAlias(AliasMetaData.builder(alias).writeIndex(false)) + .settings(settings(Version.CURRENT).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)) + .numberOfShards(1) + .numberOfReplicas(randomIntBetween(0, 5)) + .build(); + IndexMetaData rolledIndex = IndexMetaData.builder("index-000001") + .putAlias(AliasMetaData.builder(alias).writeIndex(true)) + .settings(settings(Version.CURRENT) + .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias) + .put("index.write.wait_for_active_shards", "all") + ) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + IndexRoutingTable.Builder routingTable = new IndexRoutingTable.Builder(rolledIndex.getIndex()); + routingTable.addShard(TestShardRouting.newShardRouting(rolledIndex.getIndex().getName(), 0, "node", null, true, + ShardRoutingState.STARTED)); + routingTable.addShard(TestShardRouting.newShardRouting(rolledIndex.getIndex().getName(), 0, "node2", null, false, + ShardRoutingState.STARTED)); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metaData(MetaData.builder().put(originalIndex, true) + .put(rolledIndex, true) + .build()) + .routingTable(RoutingTable.builder().add(routingTable.build()).build()) + .build(); + + assertThat("the rolled index has both the primary and the replica shards started so the condition should be met", + createRandomInstance().isConditionMet(originalIndex.getIndex(), clusterState).isComplete(), is(true)); + } + + public void testResultEvaluatedOnOnlyIndexTheAliasPointsToIfWriteIndexIsNull() { + String alias = randomAlphaOfLength(5); + IndexMetaData originalIndex = IndexMetaData.builder("index-000000") + .settings(settings(Version.CURRENT).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)) + .numberOfShards(1) + .numberOfReplicas(randomIntBetween(0, 5)) + .build(); + IndexMetaData rolledIndex = IndexMetaData.builder("index-000001") + .putAlias(AliasMetaData.builder(alias).writeIndex(false)) + .settings(settings(Version.CURRENT) + .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias) + .put("index.write.wait_for_active_shards", "all") + ) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + IndexRoutingTable.Builder routingTable = new IndexRoutingTable.Builder(rolledIndex.getIndex()); + routingTable.addShard(TestShardRouting.newShardRouting(rolledIndex.getIndex().getName(), 0, "node", null, true, + ShardRoutingState.STARTED)); + routingTable.addShard(TestShardRouting.newShardRouting(rolledIndex.getIndex().getName(), 0, "node2", null, false, + ShardRoutingState.STARTED)); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metaData(MetaData.builder().put(originalIndex, true) + .put(rolledIndex, true) + .build()) + .routingTable(RoutingTable.builder().add(routingTable.build()).build()) + .build(); + + assertThat("the index the alias is pointing to has both the primary and the replica shards started so the condition should be" + + " met", createRandomInstance().isConditionMet(originalIndex.getIndex(), clusterState).isComplete(), is(true)); + } + + public void testResultReportsMeaningfulMessage() throws IOException { + String alias = randomAlphaOfLength(5); + IndexMetaData originalIndex = IndexMetaData.builder("index-000000") + .putAlias(AliasMetaData.builder(alias).writeIndex(false)) + .settings(settings(Version.CURRENT).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)) + .numberOfShards(1) + .numberOfReplicas(randomIntBetween(0, 5)) + .build(); + IndexMetaData rolledIndex = IndexMetaData.builder("index-000001") + .putAlias(AliasMetaData.builder(alias).writeIndex(true)) + .settings(settings(Version.CURRENT) + .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias) + .put("index.write.wait_for_active_shards", "3") + ) + .numberOfShards(1) + .numberOfReplicas(2) + .build(); + IndexRoutingTable.Builder routingTable = new IndexRoutingTable.Builder(rolledIndex.getIndex()); + routingTable.addShard(TestShardRouting.newShardRouting(rolledIndex.getIndex().getName(), 0, "node", null, true, + ShardRoutingState.STARTED)); + routingTable.addShard(TestShardRouting.newShardRouting(rolledIndex.getIndex().getName(), 0, "node2", null, false, + ShardRoutingState.STARTED)); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metaData(MetaData.builder().put(originalIndex, true) + .put(rolledIndex, true) + .build()) + .routingTable(RoutingTable.builder().add(routingTable.build()).build()) + .build(); + + ClusterStateWaitStep.Result result = createRandomInstance().isConditionMet(originalIndex.getIndex(), clusterState); + assertThat(result.isComplete(), is(false)); + + XContentBuilder expected = new WaitForActiveShardsStep.ActiveShardsInfo(2, "3", false).toXContent(JsonXContent.contentBuilder(), + ToXContent.EMPTY_PARAMS); + String actualResultAsString = Strings.toString(result.getInfomationContext()); + assertThat(actualResultAsString, is(Strings.toString(expected))); + assertThat(actualResultAsString, containsString("waiting for [3] shards to become active, but only [2] are active")); + } + + public void testResultReportsErrorMessage() { + String alias = randomAlphaOfLength(5); + IndexMetaData rolledIndex = IndexMetaData.builder("index-000001") + .putAlias(AliasMetaData.builder(alias).writeIndex(true)) + .settings(settings(Version.CURRENT) + .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias) + .put("index.write.wait_for_active_shards", "3") + ) + .numberOfShards(1) + .numberOfReplicas(2) + .build(); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metaData(MetaData.builder().put(rolledIndex, true).build()) + .build(); + + WaitForActiveShardsStep step = createRandomInstance(); + ClusterStateWaitStep.Result result = step.isConditionMet(new Index("index-000000", UUID.randomUUID().toString()), + clusterState); + assertThat(result.isComplete(), is(false)); + + String actualResultAsString = Strings.toString(result.getInfomationContext()); + assertThat(actualResultAsString, + containsString("[" + step.getKey().getAction() + "] lifecycle action for index [index-000000] executed but " + + "index no longer exists")); + } + + public void testParseIndexNameReturnsCounter() { + assertThat(parseIndexNameCounter("logs-000003"), is(3)); + } + + public void testParseIndexNameSupportsDateMathPattern() { + assertThat(parseIndexNameCounter(""), is(1)); + } + + public void testParseIndexNameThrowExceptionWhenNoSeparatorIsPresent() { + try { + parseIndexNameCounter("testIndexNameWithoutDash"); + fail("expected to fail as the index name contains no - separator"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), is("no - separator found in index name [testIndexNameWithoutDash]")); + } + } + + public void testParseIndexNameCannotFormatNumber() { + try { + parseIndexNameCounter("testIndexName-000a2"); + fail("expected to fail as the index name doesn't end with digits"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), is("unable to parse the index name [testIndexName-000a2] to extract the counter")); + } + } +} diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java index 9e502016b98..0a8b3af9338 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java @@ -44,6 +44,7 @@ import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep; import org.elasticsearch.xpack.core.ilm.UpdateRolloverLifecycleDateStep; +import org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep; import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep; import org.elasticsearch.xpack.core.ilm.WaitForSnapshotAction; import org.hamcrest.Matchers; @@ -1210,6 +1211,41 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase { assertBusy(() -> assertThat(getStepKeyForIndex(index), equalTo(TerminalPolicyStep.KEY))); } + public void testWaitForActiveShardsStep() throws Exception { + String originalIndex = index + "-000001"; + String secondIndex = index + "-000002"; + createIndexWithSettings(originalIndex, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"), + true); + + // create policy + createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L)); + // update policy on index + updatePolicy(originalIndex, policy); + Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes"); + createIndexTemplate.setJsonEntity("{" + + "\"index_patterns\": [\""+ index + "-*\"], \n" + + " \"settings\": {\n" + + " \"number_of_shards\": 1,\n" + + " \"number_of_replicas\": 142,\n" + + " \"index.write.wait_for_active_shards\": \"all\"\n" + + " }\n" + + "}"); + client().performRequest(createIndexTemplate); + + // index document to trigger rollover + index(client(), originalIndex, "_id", "foo", "bar"); + assertBusy(() -> assertTrue(indexExists(secondIndex))); + + assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex).getName(), equalTo(WaitForActiveShardsStep.NAME))); + + // reset the number of replicas to 0 so that the second index wait for active shard condition can be met + updateIndexSettings(secondIndex, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)); + + assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(TerminalPolicyStep.KEY))); + } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/50353") public void testHistoryIsWrittenWithSuccess() throws Exception { String index = "success-index"; diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleActionTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleActionTests.java index f2f67fa281f..1fc93ade959 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleActionTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleActionTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.xpack.core.ilm.SegmentCountStep; import org.elasticsearch.xpack.core.ilm.SetPriorityAction; import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.ilm.UpdateRolloverLifecycleDateStep; +import org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep; import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep; import org.elasticsearch.xpack.ilm.IndexLifecycle; @@ -122,6 +123,7 @@ public class TransportPutLifecycleActionTests extends ESTestCase { " }", "phase"), contains(new Step.StepKey("phase", "rollover", WaitForRolloverReadyStep.NAME), new Step.StepKey("phase", "rollover", RolloverStep.NAME), + new Step.StepKey("phase", "rollover", WaitForActiveShardsStep.NAME), new Step.StepKey("phase", "rollover", UpdateRolloverLifecycleDateStep.NAME), new Step.StepKey("phase", "rollover", RolloverAction.INDEXING_COMPLETE_STEP_NAME))); @@ -143,6 +145,7 @@ public class TransportPutLifecycleActionTests extends ESTestCase { " }", "phase"), contains(new Step.StepKey("phase", "rollover", WaitForRolloverReadyStep.NAME), new Step.StepKey("phase", "rollover", RolloverStep.NAME), + new Step.StepKey("phase", "rollover", WaitForActiveShardsStep.NAME), new Step.StepKey("phase", "rollover", UpdateRolloverLifecycleDateStep.NAME), new Step.StepKey("phase", "rollover", RolloverAction.INDEXING_COMPLETE_STEP_NAME), new Step.StepKey("phase", "set_priority", SetPriorityAction.NAME)));