After we rollover the index we wait for the configured number of shards for the rolled index to become active (based on the index.write.wait_for_active_shards setting which might be present in a template, or otherwise in the default case, for the primaries to become active). This wait might be long due to disk watermarks being tripped, replicas not being able to spring to life due to cluster nodes reconfiguration and others and, the RolloverStep might not complete successfully due to this inherent transient situation, albeit the rolled index having been created. (cherry picked from commit 457a92fb4c68c55976cc3c3e2f00a053dd2eac70) Signed-off-by: Andrei Dan <andrei.dan@elastic.co>
This commit is contained in:
parent
7b4c2bfdc4
commit
123266714b
|
@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.indices.rollover;
|
|||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
|
@ -170,6 +171,13 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
|
|||
this.dryRun = dryRun;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the wait for active shards configuration for the rolled index that gets created.
|
||||
*/
|
||||
public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
|
||||
createIndexRequest.waitForActiveShards(waitForActiveShards);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds condition to check if the index is at least <code>age</code> old
|
||||
*/
|
||||
|
|
|
@ -139,17 +139,19 @@ public class RolloverAction implements LifecycleAction {
|
|||
|
||||
StepKey waitForRolloverReadyStepKey = new StepKey(phase, NAME, WaitForRolloverReadyStep.NAME);
|
||||
StepKey rolloverStepKey = new StepKey(phase, NAME, RolloverStep.NAME);
|
||||
StepKey waitForActiveShardsKey = new StepKey(phase, NAME, WaitForActiveShardsStep.NAME);
|
||||
StepKey updateDateStepKey = new StepKey(phase, NAME, UpdateRolloverLifecycleDateStep.NAME);
|
||||
StepKey setIndexingCompleteStepKey = new StepKey(phase, NAME, INDEXING_COMPLETE_STEP_NAME);
|
||||
|
||||
WaitForRolloverReadyStep waitForRolloverReadyStep = new WaitForRolloverReadyStep(waitForRolloverReadyStepKey, rolloverStepKey,
|
||||
client, maxSize, maxAge, maxDocs);
|
||||
RolloverStep rolloverStep = new RolloverStep(rolloverStepKey, updateDateStepKey, client);
|
||||
RolloverStep rolloverStep = new RolloverStep(rolloverStepKey, waitForActiveShardsKey, client);
|
||||
WaitForActiveShardsStep waitForActiveShardsStep = new WaitForActiveShardsStep(waitForActiveShardsKey, updateDateStepKey);
|
||||
UpdateRolloverLifecycleDateStep updateDateStep = new UpdateRolloverLifecycleDateStep(updateDateStepKey, setIndexingCompleteStepKey,
|
||||
System::currentTimeMillis);
|
||||
UpdateSettingsStep setIndexingCompleteStep = new UpdateSettingsStep(setIndexingCompleteStepKey, nextStepKey,
|
||||
client, indexingComplete);
|
||||
return Arrays.asList(waitForRolloverReadyStep, rolloverStep, updateDateStep, setIndexingCompleteStep);
|
||||
return Arrays.asList(waitForRolloverReadyStep, rolloverStep, waitForActiveShardsStep, updateDateStep, setIndexingCompleteStep);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
|
@ -70,6 +71,9 @@ public class RolloverStep extends AsyncActionStep {
|
|||
|
||||
// Calling rollover with no conditions will always roll over the index
|
||||
RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null);
|
||||
// We don't wait for active shards when we perform the rollover because the
|
||||
// {@link org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep} step will do so
|
||||
rolloverRequest.setWaitForActiveShards(ActiveShardCount.NONE);
|
||||
getClient().admin().indices().rolloverIndex(rolloverRequest,
|
||||
ActionListener.wrap(response -> {
|
||||
assert response.isRolledOver() : "the only way this rollover call should fail is with an exception";
|
||||
|
|
|
@ -0,0 +1,232 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ilm;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.IntObjectCursor;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.AliasOrIndex;
|
||||
import org.elasticsearch.cluster.metadata.AliasOrIndex.Alias;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* After we performed the index rollover we wait for the the configured number of shards for the rolled over index (ie. newly created
|
||||
* index) to become available.
|
||||
*/
|
||||
public class WaitForActiveShardsStep extends ClusterStateWaitStep {
|
||||
|
||||
public static final String NAME = "wait-for-active-shards";
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(WaitForActiveShardsStep.class);
|
||||
|
||||
WaitForActiveShardsStep(StepKey key, StepKey nextStepKey) {
|
||||
super(key, nextStepKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRetryable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result isConditionMet(Index index, ClusterState clusterState) {
|
||||
IndexMetaData originalIndexMeta = clusterState.metaData().index(index);
|
||||
|
||||
if (originalIndexMeta == null) {
|
||||
String errorMessage = String.format(Locale.ROOT, "[%s] lifecycle action for index [%s] executed but index no longer exists",
|
||||
getKey().getAction(), index.getName());
|
||||
// Index must have been since deleted
|
||||
logger.debug(errorMessage);
|
||||
return new Result(false, new Info(errorMessage));
|
||||
}
|
||||
|
||||
boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(originalIndexMeta.getSettings());
|
||||
if (indexingComplete) {
|
||||
String message = String.format(Locale.ROOT, "index [%s] has lifecycle complete set, skipping [%s]",
|
||||
originalIndexMeta.getIndex().getName(), WaitForActiveShardsStep.NAME);
|
||||
logger.trace(message);
|
||||
return new Result(true, new Info(message));
|
||||
}
|
||||
|
||||
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(originalIndexMeta.getSettings());
|
||||
if (Strings.isNullOrEmpty(rolloverAlias)) {
|
||||
throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
|
||||
+ "] is not set on index [" + originalIndexMeta.getIndex().getName() + "]");
|
||||
}
|
||||
|
||||
AliasOrIndex aliasOrIndex = clusterState.metaData().getAliasAndIndexLookup().get(rolloverAlias);
|
||||
assert aliasOrIndex.isAlias() : rolloverAlias + " must be an alias but it is an index";
|
||||
|
||||
Alias alias = (Alias) aliasOrIndex;
|
||||
IndexMetaData aliasWriteIndex = alias.getWriteIndex();
|
||||
final String rolledIndexName;
|
||||
final String waitForActiveShardsSettingValue;
|
||||
if (aliasWriteIndex != null) {
|
||||
rolledIndexName = aliasWriteIndex.getIndex().getName();
|
||||
waitForActiveShardsSettingValue = aliasWriteIndex.getSettings().get("index.write.wait_for_active_shards");
|
||||
} else {
|
||||
List<IndexMetaData> indices = alias.getIndices();
|
||||
int maxIndexCounter = -1;
|
||||
IndexMetaData rolledIndexMeta = null;
|
||||
for (IndexMetaData indexMetaData : indices) {
|
||||
int indexNameCounter = parseIndexNameCounter(indexMetaData.getIndex().getName());
|
||||
if (maxIndexCounter < indexNameCounter) {
|
||||
maxIndexCounter = indexNameCounter;
|
||||
rolledIndexMeta = indexMetaData;
|
||||
}
|
||||
}
|
||||
if (rolledIndexMeta == null) {
|
||||
String errorMessage = String.format(Locale.ROOT,
|
||||
"unable to find the index that was rolled over from [%s] as part of lifecycle action [%s]", index.getName(),
|
||||
getKey().getAction());
|
||||
|
||||
// Index must have been since deleted
|
||||
logger.debug(errorMessage);
|
||||
return new Result(false, new Info(errorMessage));
|
||||
}
|
||||
rolledIndexName = rolledIndexMeta.getIndex().getName();
|
||||
waitForActiveShardsSettingValue = rolledIndexMeta.getSettings().get("index.write.wait_for_active_shards");
|
||||
}
|
||||
|
||||
ActiveShardCount activeShardCount = ActiveShardCount.parseString(waitForActiveShardsSettingValue);
|
||||
boolean enoughShardsActive = activeShardCount.enoughShardsActive(clusterState, rolledIndexName);
|
||||
|
||||
IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(rolledIndexName);
|
||||
int currentActiveShards = 0;
|
||||
for (final IntObjectCursor<IndexShardRoutingTable> shardRouting : indexRoutingTable.getShards()) {
|
||||
currentActiveShards += shardRouting.value.activeShards().size();
|
||||
}
|
||||
return new Result(enoughShardsActive, new ActiveShardsInfo(currentActiveShards, activeShardCount.toString(), enoughShardsActive));
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses the number from the rolled over index name. It also supports the date-math format (ie. index name is wrapped in < and >)
|
||||
* <p>
|
||||
* Eg.
|
||||
* <p>
|
||||
* - For "logs-000002" it'll return 2
|
||||
* - For "<logs-{now/d}-3>" it'll return 3
|
||||
*/
|
||||
static int parseIndexNameCounter(String indexName) {
|
||||
int numberIndex = indexName.lastIndexOf("-");
|
||||
if (numberIndex == -1) {
|
||||
throw new IllegalArgumentException("no - separator found in index name [" + indexName + "]");
|
||||
}
|
||||
try {
|
||||
return Integer.parseInt(indexName.substring(numberIndex + 1, indexName.endsWith(">") ? indexName.length() - 1 :
|
||||
indexName.length()));
|
||||
} catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("unable to parse the index name [" + indexName + "] to extract the counter", e);
|
||||
}
|
||||
}
|
||||
|
||||
static final class ActiveShardsInfo implements ToXContentObject {
|
||||
|
||||
private final long currentActiveShardsCount;
|
||||
private final String targetActiveShardsCount;
|
||||
private final boolean enoughShardsActive;
|
||||
private final String message;
|
||||
|
||||
static final ParseField CURRENT_ACTIVE_SHARDS_COUNT = new ParseField("current_active_shards_count");
|
||||
static final ParseField TARGET_ACTIVE_SHARDS_COUNT = new ParseField("target_active_shards_count");
|
||||
static final ParseField ENOUGH_SHARDS_ACTIVE = new ParseField("enough_shards_active");
|
||||
static final ParseField MESSAGE = new ParseField("message");
|
||||
|
||||
ActiveShardsInfo(long currentActiveShardsCount, String targetActiveShardsCount, boolean enoughShardsActive) {
|
||||
this.currentActiveShardsCount = currentActiveShardsCount;
|
||||
this.targetActiveShardsCount = targetActiveShardsCount;
|
||||
this.enoughShardsActive = enoughShardsActive;
|
||||
|
||||
if (enoughShardsActive) {
|
||||
message = "the target of [" + targetActiveShardsCount + "] are active. Don't need to wait anymore";
|
||||
} else {
|
||||
message = "waiting for [" + targetActiveShardsCount + "] shards to become active, but only [" + currentActiveShardsCount +
|
||||
"] are active";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(MESSAGE.getPreferredName(), message);
|
||||
builder.field(CURRENT_ACTIVE_SHARDS_COUNT.getPreferredName(), currentActiveShardsCount);
|
||||
builder.field(TARGET_ACTIVE_SHARDS_COUNT.getPreferredName(), targetActiveShardsCount);
|
||||
builder.field(ENOUGH_SHARDS_ACTIVE.getPreferredName(), enoughShardsActive);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
ActiveShardsInfo info = (ActiveShardsInfo) o;
|
||||
return currentActiveShardsCount == info.currentActiveShardsCount &&
|
||||
enoughShardsActive == info.enoughShardsActive &&
|
||||
Objects.equals(targetActiveShardsCount, info.targetActiveShardsCount) &&
|
||||
Objects.equals(message, info.message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(currentActiveShardsCount, targetActiveShardsCount, enoughShardsActive, message);
|
||||
}
|
||||
}
|
||||
|
||||
static final class Info implements ToXContentObject {
|
||||
|
||||
private final String message;
|
||||
|
||||
static final ParseField MESSAGE = new ParseField("message");
|
||||
|
||||
Info(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(MESSAGE.getPreferredName(), message);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
Info info = (Info) o;
|
||||
return Objects.equals(message, info.message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(message);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -77,28 +77,32 @@ public class RolloverActionTests extends AbstractActionTestCase<RolloverAction>
|
|||
RolloverAction action = createTestInstance();
|
||||
String phase = randomAlphaOfLengthBetween(1, 10);
|
||||
StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
|
||||
randomAlphaOfLengthBetween(1, 10));
|
||||
randomAlphaOfLengthBetween(1, 10));
|
||||
List<Step> steps = action.toSteps(null, phase, nextStepKey);
|
||||
assertNotNull(steps);
|
||||
assertEquals(4, steps.size());
|
||||
assertEquals(5, steps.size());
|
||||
StepKey expectedFirstStepKey = new StepKey(phase, RolloverAction.NAME, WaitForRolloverReadyStep.NAME);
|
||||
StepKey expectedSecondStepKey = new StepKey(phase, RolloverAction.NAME, RolloverStep.NAME);
|
||||
StepKey expectedThirdStepKey = new StepKey(phase, RolloverAction.NAME, UpdateRolloverLifecycleDateStep.NAME);
|
||||
StepKey expectedFourthStepKey = new StepKey(phase, RolloverAction.NAME, RolloverAction.INDEXING_COMPLETE_STEP_NAME);
|
||||
StepKey expectedThirdStepKey = new StepKey(phase, RolloverAction.NAME, WaitForActiveShardsStep.NAME);
|
||||
StepKey expectedFourthStepKey = new StepKey(phase, RolloverAction.NAME, UpdateRolloverLifecycleDateStep.NAME);
|
||||
StepKey expectedFifthStepKey = new StepKey(phase, RolloverAction.NAME, RolloverAction.INDEXING_COMPLETE_STEP_NAME);
|
||||
WaitForRolloverReadyStep firstStep = (WaitForRolloverReadyStep) steps.get(0);
|
||||
RolloverStep secondStep = (RolloverStep) steps.get(1);
|
||||
UpdateRolloverLifecycleDateStep thirdStep = (UpdateRolloverLifecycleDateStep) steps.get(2);
|
||||
UpdateSettingsStep fourthStep = (UpdateSettingsStep) steps.get(3);
|
||||
WaitForActiveShardsStep thirdStep = (WaitForActiveShardsStep) steps.get(2);
|
||||
UpdateRolloverLifecycleDateStep fourthStep = (UpdateRolloverLifecycleDateStep) steps.get(3);
|
||||
UpdateSettingsStep fifthStep = (UpdateSettingsStep) steps.get(4);
|
||||
assertEquals(expectedFirstStepKey, firstStep.getKey());
|
||||
assertEquals(expectedSecondStepKey, secondStep.getKey());
|
||||
assertEquals(expectedThirdStepKey, thirdStep.getKey());
|
||||
assertEquals(expectedFourthStepKey, fourthStep.getKey());
|
||||
assertEquals(expectedFifthStepKey, fifthStep.getKey());
|
||||
assertEquals(secondStep.getKey(), firstStep.getNextStepKey());
|
||||
assertEquals(thirdStep.getKey(), secondStep.getNextStepKey());
|
||||
assertEquals(fourthStep.getKey(), thirdStep.getNextStepKey());
|
||||
assertEquals(fifthStep.getKey(), fourthStep.getNextStepKey());
|
||||
assertEquals(action.getMaxSize(), firstStep.getMaxSize());
|
||||
assertEquals(action.getMaxAge(), firstStep.getMaxAge());
|
||||
assertEquals(action.getMaxDocs(), firstStep.getMaxDocs());
|
||||
assertEquals(nextStepKey, fourthStep.getNextStepKey());
|
||||
assertEquals(nextStepKey, fifthStep.getNextStepKey());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,240 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ilm;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep.parseIndexNameCounter;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class WaitForActiveShardsTests extends AbstractStepTestCase<WaitForActiveShardsStep> {
|
||||
|
||||
@Override
|
||||
public WaitForActiveShardsStep createRandomInstance() {
|
||||
StepKey stepKey = randomStepKey();
|
||||
StepKey nextStepKey = randomStepKey();
|
||||
|
||||
return new WaitForActiveShardsStep(stepKey, nextStepKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public WaitForActiveShardsStep mutateInstance(WaitForActiveShardsStep instance) {
|
||||
StepKey key = instance.getKey();
|
||||
StepKey nextKey = instance.getNextStepKey();
|
||||
|
||||
switch (between(0, 1)) {
|
||||
case 0:
|
||||
key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||
break;
|
||||
case 1:
|
||||
nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("Illegal randomisation branch");
|
||||
}
|
||||
|
||||
return new WaitForActiveShardsStep(key, nextKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public WaitForActiveShardsStep copyInstance(WaitForActiveShardsStep instance) {
|
||||
return new WaitForActiveShardsStep(instance.getKey(), instance.getNextStepKey());
|
||||
}
|
||||
|
||||
public void testIsConditionMetThrowsExceptionWhenRolloverAliasIsNotSet() {
|
||||
String alias = randomAlphaOfLength(5);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10))
|
||||
.putAlias(AliasMetaData.builder(alias))
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metaData(MetaData.builder().put(indexMetaData, true).build())
|
||||
.build();
|
||||
|
||||
try {
|
||||
createRandomInstance().isConditionMet(indexMetaData.getIndex(), clusterState);
|
||||
fail("expected the invocation to fail");
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e.getMessage(), is("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
|
||||
+ "] is not set on index [" + indexMetaData.getIndex().getName() + "]"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testResultEvaluatedOnWriteIndexAliasWhenExists() {
|
||||
String alias = randomAlphaOfLength(5);
|
||||
IndexMetaData originalIndex = IndexMetaData.builder("index-000000")
|
||||
.putAlias(AliasMetaData.builder(alias).writeIndex(false))
|
||||
.settings(settings(Version.CURRENT).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(randomIntBetween(0, 5))
|
||||
.build();
|
||||
IndexMetaData rolledIndex = IndexMetaData.builder("index-000001")
|
||||
.putAlias(AliasMetaData.builder(alias).writeIndex(true))
|
||||
.settings(settings(Version.CURRENT)
|
||||
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)
|
||||
.put("index.write.wait_for_active_shards", "all")
|
||||
)
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(1)
|
||||
.build();
|
||||
IndexRoutingTable.Builder routingTable = new IndexRoutingTable.Builder(rolledIndex.getIndex());
|
||||
routingTable.addShard(TestShardRouting.newShardRouting(rolledIndex.getIndex().getName(), 0, "node", null, true,
|
||||
ShardRoutingState.STARTED));
|
||||
routingTable.addShard(TestShardRouting.newShardRouting(rolledIndex.getIndex().getName(), 0, "node2", null, false,
|
||||
ShardRoutingState.STARTED));
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metaData(MetaData.builder().put(originalIndex, true)
|
||||
.put(rolledIndex, true)
|
||||
.build())
|
||||
.routingTable(RoutingTable.builder().add(routingTable.build()).build())
|
||||
.build();
|
||||
|
||||
assertThat("the rolled index has both the primary and the replica shards started so the condition should be met",
|
||||
createRandomInstance().isConditionMet(originalIndex.getIndex(), clusterState).isComplete(), is(true));
|
||||
}
|
||||
|
||||
public void testResultEvaluatedOnOnlyIndexTheAliasPointsToIfWriteIndexIsNull() {
|
||||
String alias = randomAlphaOfLength(5);
|
||||
IndexMetaData originalIndex = IndexMetaData.builder("index-000000")
|
||||
.settings(settings(Version.CURRENT).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(randomIntBetween(0, 5))
|
||||
.build();
|
||||
IndexMetaData rolledIndex = IndexMetaData.builder("index-000001")
|
||||
.putAlias(AliasMetaData.builder(alias).writeIndex(false))
|
||||
.settings(settings(Version.CURRENT)
|
||||
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)
|
||||
.put("index.write.wait_for_active_shards", "all")
|
||||
)
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(1)
|
||||
.build();
|
||||
IndexRoutingTable.Builder routingTable = new IndexRoutingTable.Builder(rolledIndex.getIndex());
|
||||
routingTable.addShard(TestShardRouting.newShardRouting(rolledIndex.getIndex().getName(), 0, "node", null, true,
|
||||
ShardRoutingState.STARTED));
|
||||
routingTable.addShard(TestShardRouting.newShardRouting(rolledIndex.getIndex().getName(), 0, "node2", null, false,
|
||||
ShardRoutingState.STARTED));
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metaData(MetaData.builder().put(originalIndex, true)
|
||||
.put(rolledIndex, true)
|
||||
.build())
|
||||
.routingTable(RoutingTable.builder().add(routingTable.build()).build())
|
||||
.build();
|
||||
|
||||
assertThat("the index the alias is pointing to has both the primary and the replica shards started so the condition should be" +
|
||||
" met", createRandomInstance().isConditionMet(originalIndex.getIndex(), clusterState).isComplete(), is(true));
|
||||
}
|
||||
|
||||
public void testResultReportsMeaningfulMessage() throws IOException {
|
||||
String alias = randomAlphaOfLength(5);
|
||||
IndexMetaData originalIndex = IndexMetaData.builder("index-000000")
|
||||
.putAlias(AliasMetaData.builder(alias).writeIndex(false))
|
||||
.settings(settings(Version.CURRENT).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(randomIntBetween(0, 5))
|
||||
.build();
|
||||
IndexMetaData rolledIndex = IndexMetaData.builder("index-000001")
|
||||
.putAlias(AliasMetaData.builder(alias).writeIndex(true))
|
||||
.settings(settings(Version.CURRENT)
|
||||
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)
|
||||
.put("index.write.wait_for_active_shards", "3")
|
||||
)
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(2)
|
||||
.build();
|
||||
IndexRoutingTable.Builder routingTable = new IndexRoutingTable.Builder(rolledIndex.getIndex());
|
||||
routingTable.addShard(TestShardRouting.newShardRouting(rolledIndex.getIndex().getName(), 0, "node", null, true,
|
||||
ShardRoutingState.STARTED));
|
||||
routingTable.addShard(TestShardRouting.newShardRouting(rolledIndex.getIndex().getName(), 0, "node2", null, false,
|
||||
ShardRoutingState.STARTED));
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metaData(MetaData.builder().put(originalIndex, true)
|
||||
.put(rolledIndex, true)
|
||||
.build())
|
||||
.routingTable(RoutingTable.builder().add(routingTable.build()).build())
|
||||
.build();
|
||||
|
||||
ClusterStateWaitStep.Result result = createRandomInstance().isConditionMet(originalIndex.getIndex(), clusterState);
|
||||
assertThat(result.isComplete(), is(false));
|
||||
|
||||
XContentBuilder expected = new WaitForActiveShardsStep.ActiveShardsInfo(2, "3", false).toXContent(JsonXContent.contentBuilder(),
|
||||
ToXContent.EMPTY_PARAMS);
|
||||
String actualResultAsString = Strings.toString(result.getInfomationContext());
|
||||
assertThat(actualResultAsString, is(Strings.toString(expected)));
|
||||
assertThat(actualResultAsString, containsString("waiting for [3] shards to become active, but only [2] are active"));
|
||||
}
|
||||
|
||||
public void testResultReportsErrorMessage() {
|
||||
String alias = randomAlphaOfLength(5);
|
||||
IndexMetaData rolledIndex = IndexMetaData.builder("index-000001")
|
||||
.putAlias(AliasMetaData.builder(alias).writeIndex(true))
|
||||
.settings(settings(Version.CURRENT)
|
||||
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)
|
||||
.put("index.write.wait_for_active_shards", "3")
|
||||
)
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(2)
|
||||
.build();
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metaData(MetaData.builder().put(rolledIndex, true).build())
|
||||
.build();
|
||||
|
||||
WaitForActiveShardsStep step = createRandomInstance();
|
||||
ClusterStateWaitStep.Result result = step.isConditionMet(new Index("index-000000", UUID.randomUUID().toString()),
|
||||
clusterState);
|
||||
assertThat(result.isComplete(), is(false));
|
||||
|
||||
String actualResultAsString = Strings.toString(result.getInfomationContext());
|
||||
assertThat(actualResultAsString,
|
||||
containsString("[" + step.getKey().getAction() + "] lifecycle action for index [index-000000] executed but " +
|
||||
"index no longer exists"));
|
||||
}
|
||||
|
||||
public void testParseIndexNameReturnsCounter() {
|
||||
assertThat(parseIndexNameCounter("logs-000003"), is(3));
|
||||
}
|
||||
|
||||
public void testParseIndexNameSupportsDateMathPattern() {
|
||||
assertThat(parseIndexNameCounter("<logs-{now/d}-1>"), is(1));
|
||||
}
|
||||
|
||||
public void testParseIndexNameThrowExceptionWhenNoSeparatorIsPresent() {
|
||||
try {
|
||||
parseIndexNameCounter("testIndexNameWithoutDash");
|
||||
fail("expected to fail as the index name contains no - separator");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), is("no - separator found in index name [testIndexNameWithoutDash]"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testParseIndexNameCannotFormatNumber() {
|
||||
try {
|
||||
parseIndexNameCounter("testIndexName-000a2");
|
||||
fail("expected to fail as the index name doesn't end with digits");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), is("unable to parse the index name [testIndexName-000a2] to extract the counter"));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -44,6 +44,7 @@ import org.elasticsearch.xpack.core.ilm.Step;
|
|||
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
|
||||
import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep;
|
||||
import org.elasticsearch.xpack.core.ilm.UpdateRolloverLifecycleDateStep;
|
||||
import org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep;
|
||||
import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep;
|
||||
import org.elasticsearch.xpack.core.ilm.WaitForSnapshotAction;
|
||||
import org.hamcrest.Matchers;
|
||||
|
@ -1210,6 +1211,41 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
assertBusy(() -> assertThat(getStepKeyForIndex(index), equalTo(TerminalPolicyStep.KEY)));
|
||||
}
|
||||
|
||||
public void testWaitForActiveShardsStep() throws Exception {
|
||||
String originalIndex = index + "-000001";
|
||||
String secondIndex = index + "-000002";
|
||||
createIndexWithSettings(originalIndex, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"),
|
||||
true);
|
||||
|
||||
// create policy
|
||||
createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L));
|
||||
// update policy on index
|
||||
updatePolicy(originalIndex, policy);
|
||||
Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes");
|
||||
createIndexTemplate.setJsonEntity("{" +
|
||||
"\"index_patterns\": [\""+ index + "-*\"], \n" +
|
||||
" \"settings\": {\n" +
|
||||
" \"number_of_shards\": 1,\n" +
|
||||
" \"number_of_replicas\": 142,\n" +
|
||||
" \"index.write.wait_for_active_shards\": \"all\"\n" +
|
||||
" }\n" +
|
||||
"}");
|
||||
client().performRequest(createIndexTemplate);
|
||||
|
||||
// index document to trigger rollover
|
||||
index(client(), originalIndex, "_id", "foo", "bar");
|
||||
assertBusy(() -> assertTrue(indexExists(secondIndex)));
|
||||
|
||||
assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex).getName(), equalTo(WaitForActiveShardsStep.NAME)));
|
||||
|
||||
// reset the number of replicas to 0 so that the second index wait for active shard condition can be met
|
||||
updateIndexSettings(secondIndex, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
|
||||
|
||||
assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(TerminalPolicyStep.KEY)));
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/50353")
|
||||
public void testHistoryIsWrittenWithSuccess() throws Exception {
|
||||
String index = "success-index";
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.xpack.core.ilm.SegmentCountStep;
|
|||
import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
|
||||
import org.elasticsearch.xpack.core.ilm.Step;
|
||||
import org.elasticsearch.xpack.core.ilm.UpdateRolloverLifecycleDateStep;
|
||||
import org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep;
|
||||
import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep;
|
||||
import org.elasticsearch.xpack.ilm.IndexLifecycle;
|
||||
|
||||
|
@ -122,6 +123,7 @@ public class TransportPutLifecycleActionTests extends ESTestCase {
|
|||
" }", "phase"),
|
||||
contains(new Step.StepKey("phase", "rollover", WaitForRolloverReadyStep.NAME),
|
||||
new Step.StepKey("phase", "rollover", RolloverStep.NAME),
|
||||
new Step.StepKey("phase", "rollover", WaitForActiveShardsStep.NAME),
|
||||
new Step.StepKey("phase", "rollover", UpdateRolloverLifecycleDateStep.NAME),
|
||||
new Step.StepKey("phase", "rollover", RolloverAction.INDEXING_COMPLETE_STEP_NAME)));
|
||||
|
||||
|
@ -143,6 +145,7 @@ public class TransportPutLifecycleActionTests extends ESTestCase {
|
|||
" }", "phase"),
|
||||
contains(new Step.StepKey("phase", "rollover", WaitForRolloverReadyStep.NAME),
|
||||
new Step.StepKey("phase", "rollover", RolloverStep.NAME),
|
||||
new Step.StepKey("phase", "rollover", WaitForActiveShardsStep.NAME),
|
||||
new Step.StepKey("phase", "rollover", UpdateRolloverLifecycleDateStep.NAME),
|
||||
new Step.StepKey("phase", "rollover", RolloverAction.INDEXING_COMPLETE_STEP_NAME),
|
||||
new Step.StepKey("phase", "set_priority", SetPriorityAction.NAME)));
|
||||
|
|
Loading…
Reference in New Issue