This adds ILM support for automatically migrating the managed indices between data tiers. This proposal makes use of a MigrateAction that is injected (similar to how the Unfollow action is injected) in phases that don't define index allocation rules using the AllocateAction or don't explicitly define the MigrateAction itself (regardless if it's enabled or disabled). (cherry picked from commit c1746afffd61048d0c12d3a77e6d8191a804ed49) Signed-off-by: Andrei Dan <andrei.dan@elastic.co>
This commit is contained in:
parent
db9dd9f7e1
commit
fe1194d58f
|
@ -57,6 +57,9 @@ public class IndexLifecycleNamedXContentProvider implements NamedXContentProvide
|
|||
new NamedXContentRegistry.Entry(LifecycleAction.class,
|
||||
new ParseField(SetPriorityAction.NAME),
|
||||
SetPriorityAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class,
|
||||
new ParseField(MigrateAction.NAME),
|
||||
MigrateAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class,
|
||||
new ParseField(UnfollowAction.NAME),
|
||||
UnfollowAction::parse)
|
||||
|
|
|
@ -58,9 +58,10 @@ public class LifecyclePolicy implements ToXContentObject {
|
|||
}, PHASES_FIELD);
|
||||
|
||||
ALLOWED_ACTIONS.put("hot", Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, RolloverAction.NAME));
|
||||
ALLOWED_ACTIONS.put("warm", Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, AllocateAction.NAME, ForceMergeAction.NAME,
|
||||
ReadOnlyAction.NAME, ShrinkAction.NAME));
|
||||
ALLOWED_ACTIONS.put("cold", Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, AllocateAction.NAME, FreezeAction.NAME));
|
||||
ALLOWED_ACTIONS.put("warm", Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, MigrateAction.NAME, AllocateAction.NAME,
|
||||
ForceMergeAction.NAME, ReadOnlyAction.NAME, ShrinkAction.NAME));
|
||||
ALLOWED_ACTIONS.put("cold", Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, MigrateAction.NAME, AllocateAction.NAME,
|
||||
FreezeAction.NAME));
|
||||
ALLOWED_ACTIONS.put("delete", Sets.newHashSet(DeleteAction.NAME));
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,90 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.client.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
public class MigrateAction implements LifecycleAction, ToXContentObject {
|
||||
public static final String NAME = "migrate";
|
||||
|
||||
public static final ParseField ENABLED_FIELD = new ParseField("enabled");
|
||||
|
||||
private static final ConstructingObjectParser<MigrateAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
|
||||
a -> new MigrateAction(a[0] == null ? true : (boolean) a[0]));
|
||||
|
||||
static {
|
||||
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ENABLED_FIELD);
|
||||
}
|
||||
|
||||
public static MigrateAction parse(XContentParser parser) {
|
||||
return PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
private final boolean enabled;
|
||||
|
||||
public MigrateAction() {
|
||||
this(true);
|
||||
}
|
||||
|
||||
public MigrateAction(boolean enabled) {
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(ENABLED_FIELD.getPreferredName(), enabled);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(enabled);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (obj.getClass() != getClass()) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Strings.toString(this);
|
||||
}
|
||||
}
|
|
@ -705,7 +705,7 @@ public class RestHighLevelClientTests extends ESTestCase {
|
|||
|
||||
public void testProvidedNamedXContents() {
|
||||
List<NamedXContentRegistry.Entry> namedXContents = RestHighLevelClient.getProvidedNamedXContents();
|
||||
assertEquals(70, namedXContents.size());
|
||||
assertEquals(71, namedXContents.size());
|
||||
Map<Class<?>, Integer> categories = new HashMap<>();
|
||||
List<String> names = new ArrayList<>();
|
||||
for (NamedXContentRegistry.Entry namedXContent : namedXContents) {
|
||||
|
@ -731,7 +731,7 @@ public class RestHighLevelClientTests extends ESTestCase {
|
|||
assertTrue(names.contains(MeanReciprocalRank.NAME));
|
||||
assertTrue(names.contains(DiscountedCumulativeGain.NAME));
|
||||
assertTrue(names.contains(ExpectedReciprocalRank.NAME));
|
||||
assertEquals(Integer.valueOf(9), categories.get(LifecycleAction.class));
|
||||
assertEquals(Integer.valueOf(10), categories.get(LifecycleAction.class));
|
||||
assertTrue(names.contains(UnfollowAction.NAME));
|
||||
assertTrue(names.contains(AllocateAction.NAME));
|
||||
assertTrue(names.contains(DeleteAction.NAME));
|
||||
|
|
|
@ -362,6 +362,7 @@ public class ILMDocumentationIT extends ESRestHighLevelClientTestCase {
|
|||
CreateIndexRequest createIndexRequest = new CreateIndexRequest("my_index-1")
|
||||
.settings(Settings.builder()
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put("index.lifecycle.name", "my_policy")
|
||||
.put("index.lifecycle.rollover_alias", "my_alias")
|
||||
.build());
|
||||
|
@ -370,6 +371,7 @@ public class ILMDocumentationIT extends ESRestHighLevelClientTestCase {
|
|||
CreateIndexRequest createOtherIndexRequest = new CreateIndexRequest("other_index")
|
||||
.settings(Settings.builder()
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.build());
|
||||
client.indices().create(createOtherIndexRequest, RequestOptions.DEFAULT);
|
||||
|
||||
|
@ -624,6 +626,7 @@ public class ILMDocumentationIT extends ESRestHighLevelClientTestCase {
|
|||
CreateIndexRequest createIndexRequest = new CreateIndexRequest("my_index")
|
||||
.settings(Settings.builder()
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put("index.lifecycle.name", "my_policy")
|
||||
.build());
|
||||
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
|
||||
|
@ -689,6 +692,7 @@ public class ILMDocumentationIT extends ESRestHighLevelClientTestCase {
|
|||
CreateIndexRequest createIndexRequest = new CreateIndexRequest("my_index")
|
||||
.settings(Settings.builder()
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put("index.lifecycle.name", "my_policy")
|
||||
.build());
|
||||
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.client.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractXContentTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class MigrateActionTests extends AbstractXContentTestCase<MigrateAction> {
|
||||
|
||||
@Override
|
||||
protected MigrateAction doParseInstance(XContentParser parser) throws IOException {
|
||||
return MigrateAction.parse(parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MigrateAction createTestInstance() {
|
||||
return randomInstance();
|
||||
}
|
||||
|
||||
static MigrateAction randomInstance() {
|
||||
return new MigrateAction(randomBoolean());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean supportsUnknownFields() {
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -217,7 +217,7 @@ information for the step that's being performed on the index.
|
|||
"message": "Waiting for all shard copies to be active",
|
||||
"shards_left_to_allocate": -1,
|
||||
"all_shards_active": false,
|
||||
"actual_replicas": 2
|
||||
"number_of_replicas": 2
|
||||
},
|
||||
"phase_execution": {
|
||||
"policy": "my_lifecycle3",
|
||||
|
|
|
@ -66,6 +66,7 @@ import org.elasticsearch.xpack.core.ilm.IndexLifecycleFeatureSetUsage;
|
|||
import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecycleAction;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecycleType;
|
||||
import org.elasticsearch.xpack.core.ilm.MigrateAction;
|
||||
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
|
||||
import org.elasticsearch.xpack.core.ilm.RolloverAction;
|
||||
import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
|
||||
|
@ -623,6 +624,7 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
|
|||
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, WaitForSnapshotAction.NAME, WaitForSnapshotAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SearchableSnapshotAction.NAME, SearchableSnapshotAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, MigrateAction.NAME, MigrateAction::new),
|
||||
// Transforms
|
||||
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.TRANSFORM, TransformFeatureSetUsage::new),
|
||||
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, TransformField.TASK_NAME, TransformTaskParams::new),
|
||||
|
|
|
@ -6,7 +6,6 @@
|
|||
package org.elasticsearch.xpack.core.ilm;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
|
@ -18,19 +17,15 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
|||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ilm.step.info.AllocationInfo.allShardsActiveAllocationInfo;
|
||||
import static org.elasticsearch.xpack.core.ilm.step.info.AllocationInfo.waitingForActiveShardsAllocationInfo;
|
||||
|
||||
/**
|
||||
* Checks whether all shards have been correctly routed in response to an update to the allocation rules for an index.
|
||||
|
@ -41,7 +36,7 @@ public class AllocationRoutedStep extends ClusterStateWaitStep {
|
|||
private static final Logger logger = LogManager.getLogger(AllocationRoutedStep.class);
|
||||
|
||||
private static final AllocationDeciders ALLOCATION_DECIDERS = new AllocationDeciders(Collections.singletonList(
|
||||
new FilterAllocationDecider(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))));
|
||||
new FilterAllocationDecider(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))));
|
||||
|
||||
AllocationRoutedStep(StepKey key, StepKey nextStepKey) {
|
||||
super(key, nextStepKey);
|
||||
|
@ -57,13 +52,26 @@ public class AllocationRoutedStep extends ClusterStateWaitStep {
|
|||
}
|
||||
if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()) == false) {
|
||||
logger.debug("[{}] lifecycle action for index [{}] cannot make progress because not all shards are active",
|
||||
getKey().getAction(), index.getName());
|
||||
return new Result(false, new Info(idxMeta.getNumberOfReplicas(), -1, false));
|
||||
getKey().getAction(), index.getName());
|
||||
return new Result(false, waitingForActiveShardsAllocationInfo(idxMeta.getNumberOfReplicas()));
|
||||
}
|
||||
int allocationPendingAllShards = getPendingAllocations(index, ALLOCATION_DECIDERS, clusterState);
|
||||
|
||||
if (allocationPendingAllShards > 0) {
|
||||
logger.debug("{} lifecycle action [{}] waiting for [{}] shards to be allocated to nodes matching the given filters",
|
||||
index, getKey().getAction(), allocationPendingAllShards);
|
||||
return new Result(false, allShardsActiveAllocationInfo(idxMeta.getNumberOfReplicas(), allocationPendingAllShards));
|
||||
} else {
|
||||
logger.debug("{} lifecycle action for [{}] complete", index, getKey().getAction());
|
||||
return new Result(true, null);
|
||||
}
|
||||
}
|
||||
|
||||
static int getPendingAllocations(Index index, AllocationDeciders allocationDeciders, ClusterState clusterState) {
|
||||
// All the allocation attributes are already set so just need to check
|
||||
// if the allocation has happened
|
||||
RoutingAllocation allocation = new RoutingAllocation(ALLOCATION_DECIDERS, clusterState.getRoutingNodes(), clusterState, null,
|
||||
System.nanoTime());
|
||||
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, clusterState.getRoutingNodes(), clusterState, null,
|
||||
System.nanoTime());
|
||||
|
||||
int allocationPendingAllShards = 0;
|
||||
|
||||
|
@ -71,23 +79,15 @@ public class AllocationRoutedStep extends ClusterStateWaitStep {
|
|||
for (ObjectCursor<IndexShardRoutingTable> shardRoutingTable : allShards.values()) {
|
||||
for (ShardRouting shardRouting : shardRoutingTable.value.shards()) {
|
||||
String currentNodeId = shardRouting.currentNodeId();
|
||||
boolean canRemainOnCurrentNode = ALLOCATION_DECIDERS
|
||||
.canRemain(shardRouting, clusterState.getRoutingNodes().node(currentNodeId), allocation)
|
||||
.type() == Decision.Type.YES;
|
||||
boolean canRemainOnCurrentNode = allocationDeciders
|
||||
.canRemain(shardRouting, clusterState.getRoutingNodes().node(currentNodeId), allocation)
|
||||
.type() == Decision.Type.YES;
|
||||
if (canRemainOnCurrentNode == false || shardRouting.started() == false) {
|
||||
allocationPendingAllShards++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (allocationPendingAllShards > 0) {
|
||||
logger.debug("{} lifecycle action [{}] waiting for [{}] shards to be allocated to nodes matching the given filters",
|
||||
index, getKey().getAction(), allocationPendingAllShards);
|
||||
return new Result(false, new Info(idxMeta.getNumberOfReplicas(), allocationPendingAllShards, true));
|
||||
} else {
|
||||
logger.debug("{} lifecycle action for [{}] complete", index, getKey().getAction());
|
||||
return new Result(true, null);
|
||||
}
|
||||
return allocationPendingAllShards;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -105,84 +105,4 @@ public class AllocationRoutedStep extends ClusterStateWaitStep {
|
|||
}
|
||||
return super.equals(obj);
|
||||
}
|
||||
|
||||
public static final class Info implements ToXContentObject {
|
||||
|
||||
private final long actualReplicas;
|
||||
private final long numberShardsLeftToAllocate;
|
||||
private final boolean allShardsActive;
|
||||
private final String message;
|
||||
|
||||
static final ParseField ACTUAL_REPLICAS = new ParseField("actual_replicas");
|
||||
static final ParseField SHARDS_TO_ALLOCATE = new ParseField("shards_left_to_allocate");
|
||||
static final ParseField ALL_SHARDS_ACTIVE = new ParseField("all_shards_active");
|
||||
static final ParseField MESSAGE = new ParseField("message");
|
||||
static final ConstructingObjectParser<Info, Void> PARSER = new ConstructingObjectParser<>("allocation_routed_step_info",
|
||||
a -> new Info((long) a[0], (long) a[1], (boolean) a[2]));
|
||||
static {
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), ACTUAL_REPLICAS);
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), SHARDS_TO_ALLOCATE);
|
||||
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ALL_SHARDS_ACTIVE);
|
||||
PARSER.declareString((i, s) -> {}, MESSAGE);
|
||||
}
|
||||
|
||||
public Info(long actualReplicas, long numberShardsLeftToAllocate, boolean allShardsActive) {
|
||||
this.actualReplicas = actualReplicas;
|
||||
this.numberShardsLeftToAllocate = numberShardsLeftToAllocate;
|
||||
this.allShardsActive = allShardsActive;
|
||||
if (allShardsActive == false) {
|
||||
message = "Waiting for all shard copies to be active";
|
||||
} else {
|
||||
message = "Waiting for [" + numberShardsLeftToAllocate + "] shards "
|
||||
+ "to be allocated to nodes matching the given filters";
|
||||
}
|
||||
}
|
||||
|
||||
public long getActualReplicas() {
|
||||
return actualReplicas;
|
||||
}
|
||||
|
||||
public long getNumberShardsLeftToAllocate() {
|
||||
return numberShardsLeftToAllocate;
|
||||
}
|
||||
|
||||
public boolean allShardsActive() {
|
||||
return allShardsActive;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(MESSAGE.getPreferredName(), message);
|
||||
builder.field(SHARDS_TO_ALLOCATE.getPreferredName(), numberShardsLeftToAllocate);
|
||||
builder.field(ALL_SHARDS_ACTIVE.getPreferredName(), allShardsActive);
|
||||
builder.field(ACTUAL_REPLICAS.getPreferredName(), actualReplicas);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(actualReplicas, numberShardsLeftToAllocate, allShardsActive);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
Info other = (Info) obj;
|
||||
return Objects.equals(actualReplicas, other.actualReplicas) &&
|
||||
Objects.equals(numberShardsLeftToAllocate, other.numberShardsLeftToAllocate) &&
|
||||
Objects.equals(allShardsActive, other.allShardsActive);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Strings.toString(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ilm;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;
|
||||
import org.elasticsearch.xpack.core.ilm.step.info.AllocationInfo;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Locale;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_ROLE;
|
||||
import static org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider.INDEX_ROUTING_INCLUDE_SETTING;
|
||||
import static org.elasticsearch.xpack.core.ilm.AllocationRoutedStep.getPendingAllocations;
|
||||
|
||||
/**
|
||||
* Checks whether all shards have been correctly routed in response to updating the allocation rules for an index in order
|
||||
* to migrate the index to a new tier.
|
||||
*/
|
||||
public class DataTierMigrationRoutedStep extends ClusterStateWaitStep {
|
||||
public static final String NAME = "check-migration";
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(DataTierMigrationRoutedStep.class);
|
||||
|
||||
private static final Set<Setting<?>> ALL_CLUSTER_SETTINGS;
|
||||
|
||||
static {
|
||||
Set<Setting<?>> allSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
allSettings.add(DataTierAllocationDecider.CLUSTER_ROUTING_REQUIRE_SETTING);
|
||||
allSettings.add(DataTierAllocationDecider.CLUSTER_ROUTING_INCLUDE_SETTING);
|
||||
allSettings.add(DataTierAllocationDecider.CLUSTER_ROUTING_EXCLUDE_SETTING);
|
||||
ALL_CLUSTER_SETTINGS = allSettings;
|
||||
}
|
||||
|
||||
private static final AllocationDeciders ALLOCATION_DECIDERS = new AllocationDeciders(
|
||||
org.elasticsearch.common.collect.List.of(
|
||||
new DataTierAllocationDecider(new ClusterSettings(Settings.EMPTY, ALL_CLUSTER_SETTINGS))
|
||||
)
|
||||
);
|
||||
|
||||
DataTierMigrationRoutedStep(StepKey key, StepKey nextStepKey) {
|
||||
super(key, nextStepKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRetryable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result isConditionMet(Index index, ClusterState clusterState) {
|
||||
IndexMetadata idxMeta = clusterState.metadata().index(index);
|
||||
if (idxMeta == null) {
|
||||
// Index must have been since deleted, ignore it
|
||||
logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", getKey().getAction(), index.getName());
|
||||
return new Result(false, null);
|
||||
}
|
||||
String destinationTier = INDEX_ROUTING_INCLUDE_SETTING.get(idxMeta.getSettings());
|
||||
if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()) == false) {
|
||||
logger.debug("[{}] migration of index [{}] to the [{}] tier cannot progress, as not all shards are active",
|
||||
getKey().getAction(), index.getName(), destinationTier);
|
||||
return new Result(false, AllocationInfo.waitingForActiveShardsAllocationInfo(idxMeta.getNumberOfReplicas()));
|
||||
}
|
||||
|
||||
int allocationPendingAllShards = getPendingAllocations(index, ALLOCATION_DECIDERS, clusterState);
|
||||
|
||||
if (allocationPendingAllShards > 0) {
|
||||
boolean targetTierNodeFound = false;
|
||||
for (DiscoveryNode node : clusterState.nodes()) {
|
||||
for (DiscoveryNodeRole role : node.getRoles()) {
|
||||
if (role.roleName().equals(DATA_ROLE.roleName()) || role.roleName().equals(destinationTier)) {
|
||||
targetTierNodeFound = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
String statusMessage = String.format(Locale.ROOT, "%s lifecycle action [%s] waiting for [%s] shards to be moved to the [%s] " +
|
||||
"tier" + (targetTierNodeFound ? "" : " but there are currently no [%s] nodes in the cluster"),
|
||||
index, getKey().getAction(), allocationPendingAllShards, destinationTier, destinationTier);
|
||||
logger.debug(statusMessage);
|
||||
return new Result(false, new AllocationInfo(idxMeta.getNumberOfReplicas(), allocationPendingAllShards, true, statusMessage));
|
||||
} else {
|
||||
logger.debug("[{}] migration of index [{}] to tier [{}] complete", getKey().getAction(), index, destinationTier);
|
||||
return new Result(true, null);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,127 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ilm;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;
|
||||
import org.elasticsearch.xpack.core.DataTier;
|
||||
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* A {@link LifecycleAction} which enables or disables the automatic migration of data between
|
||||
* {@link org.elasticsearch.xpack.core.DataTier}s.
|
||||
*/
|
||||
public class MigrateAction implements LifecycleAction {
|
||||
public static final String NAME = "migrate";
|
||||
public static final ParseField ENABLED_FIELD = new ParseField("enabled");
|
||||
|
||||
private static final ConstructingObjectParser<MigrateAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
|
||||
a -> new MigrateAction(a[0] == null ? true : (boolean) a[0]));
|
||||
|
||||
static {
|
||||
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ENABLED_FIELD);
|
||||
}
|
||||
|
||||
private final boolean enabled;
|
||||
|
||||
public static MigrateAction parse(XContentParser parser) {
|
||||
return PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
public MigrateAction() {
|
||||
this(true);
|
||||
}
|
||||
|
||||
public MigrateAction(boolean enabled) {
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
public MigrateAction(StreamInput in) throws IOException {
|
||||
this(in.readBoolean());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeBoolean(enabled);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
public boolean isEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(ENABLED_FIELD.getPreferredName(), enabled);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSafeAction() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
|
||||
if (enabled) {
|
||||
StepKey migrationKey = new StepKey(phase, NAME, NAME);
|
||||
StepKey migrationRoutedKey = new StepKey(phase, NAME, DataTierMigrationRoutedStep.NAME);
|
||||
|
||||
Settings.Builder migrationSettings = Settings.builder();
|
||||
String dataTierName = "data_" + phase;
|
||||
assert DataTier.validTierName(dataTierName) : "invalid data tier name:" + dataTierName;
|
||||
migrationSettings.put(DataTierAllocationDecider.INDEX_ROUTING_INCLUDE, dataTierName);
|
||||
UpdateSettingsStep updateMigrationSettingStep = new UpdateSettingsStep(migrationKey, migrationRoutedKey, client,
|
||||
migrationSettings.build());
|
||||
DataTierMigrationRoutedStep migrationRoutedStep = new DataTierMigrationRoutedStep(migrationRoutedKey, nextStepKey);
|
||||
return Arrays.asList(updateMigrationSettingStep, migrationRoutedStep);
|
||||
} else {
|
||||
return org.elasticsearch.common.collect.List.of();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(enabled);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (obj.getClass() != getClass()) {
|
||||
return false;
|
||||
}
|
||||
MigrateAction other = (MigrateAction) obj;
|
||||
return Objects.equals(enabled, other.enabled);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Strings.toString(this);
|
||||
}
|
||||
|
||||
}
|
|
@ -27,7 +27,7 @@ import java.util.List;
|
|||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* A {@link LifecycleAction} which deletes the index.
|
||||
* A {@link LifecycleAction} which rolls over the index.
|
||||
*/
|
||||
public class RolloverAction implements LifecycleAction {
|
||||
public static final String NAME = "rollover";
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.ilm;
|
||||
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
|
||||
|
@ -19,6 +20,8 @@ import java.util.Objects;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
/**
|
||||
* Represents the lifecycle of an index from creation to deletion. A
|
||||
* {@link TimeseriesLifecycleType} is made up of a set of {@link Phase}s which it will
|
||||
|
@ -40,9 +43,9 @@ public class TimeseriesLifecycleType implements LifecycleType {
|
|||
static final List<String> ORDERED_VALID_HOT_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, RolloverAction.NAME,
|
||||
ForceMergeAction.NAME);
|
||||
static final List<String> ORDERED_VALID_WARM_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, ReadOnlyAction.NAME,
|
||||
AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME);
|
||||
AllocateAction.NAME, MigrateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME);
|
||||
static final List<String> ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME,
|
||||
FreezeAction.NAME, SearchableSnapshotAction.NAME);
|
||||
MigrateAction.NAME, FreezeAction.NAME, SearchableSnapshotAction.NAME);
|
||||
static final List<String> ORDERED_VALID_FROZEN_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME,
|
||||
FreezeAction.NAME, SearchableSnapshotAction.NAME);
|
||||
static final List<String> ORDERED_VALID_DELETE_ACTIONS = Arrays.asList(WaitForSnapshotAction.NAME, DeleteAction.NAME);
|
||||
|
@ -51,7 +54,7 @@ public class TimeseriesLifecycleType implements LifecycleType {
|
|||
static final Set<String> VALID_COLD_ACTIONS = Sets.newHashSet(ORDERED_VALID_COLD_ACTIONS);
|
||||
static final Set<String> VALID_FROZEN_ACTIONS = Sets.newHashSet(ORDERED_VALID_FROZEN_ACTIONS);
|
||||
static final Set<String> VALID_DELETE_ACTIONS = Sets.newHashSet(ORDERED_VALID_DELETE_ACTIONS);
|
||||
private static Map<String, Set<String>> ALLOWED_ACTIONS = new HashMap<>();
|
||||
private static final Map<String, Set<String>> ALLOWED_ACTIONS = new HashMap<>();
|
||||
|
||||
static {
|
||||
ALLOWED_ACTIONS.put(HOT_PHASE, VALID_HOT_ACTIONS);
|
||||
|
@ -86,12 +89,33 @@ public class TimeseriesLifecycleType implements LifecycleType {
|
|||
actionMap.put(UnfollowAction.NAME, new UnfollowAction());
|
||||
phase = new Phase(phase.getName(), phase.getMinimumAge(), actionMap);
|
||||
}
|
||||
|
||||
if (shouldInjectMigrateStepForPhase(phase)) {
|
||||
Map<String, LifecycleAction> actionMap = new HashMap<>(phase.getActions());
|
||||
actionMap.put(MigrateAction.NAME, new MigrateAction(true));
|
||||
phase = new Phase(phase.getName(), phase.getMinimumAge(), actionMap);
|
||||
}
|
||||
|
||||
orderedPhases.add(phase);
|
||||
}
|
||||
}
|
||||
return orderedPhases;
|
||||
}
|
||||
|
||||
static boolean shouldInjectMigrateStepForPhase(Phase phase) {
|
||||
AllocateAction allocateAction = (AllocateAction) phase.getActions().get(AllocateAction.NAME);
|
||||
if (allocateAction != null) {
|
||||
if (definesAllocationRules(allocateAction)) {
|
||||
// we won't automatically migrate the data if an allocate action that defines any allocation rule is present
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
MigrateAction migrateAction = (MigrateAction) phase.getActions().get(MigrateAction.NAME);
|
||||
// if the user configured the {@link MigrateAction} already we won't automatically configure it
|
||||
return migrateAction == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNextPhaseName(String currentPhaseName, Map<String, Phase> phases) {
|
||||
int index = VALID_PHASES.indexOf(currentPhaseName);
|
||||
|
@ -121,7 +145,7 @@ public class TimeseriesLifecycleType implements LifecycleType {
|
|||
throw new IllegalArgumentException("[" + currentPhaseName + "] is not a valid phase for lifecycle type [" + TYPE + "]");
|
||||
} else {
|
||||
// Find the previous phase before `index` that exists in `phases` and return it
|
||||
while (--index >=0) {
|
||||
while (--index >= 0) {
|
||||
String phaseName = VALID_PHASES.get(index);
|
||||
if (phases.containsKey(phaseName)) {
|
||||
return phaseName;
|
||||
|
@ -139,19 +163,19 @@ public class TimeseriesLifecycleType implements LifecycleType {
|
|||
switch (phase.getName()) {
|
||||
case HOT_PHASE:
|
||||
return ORDERED_VALID_HOT_ACTIONS.stream().map(a -> actions.getOrDefault(a, null))
|
||||
.filter(Objects::nonNull).collect(Collectors.toList());
|
||||
.filter(Objects::nonNull).collect(toList());
|
||||
case WARM_PHASE:
|
||||
return ORDERED_VALID_WARM_ACTIONS.stream() .map(a -> actions.getOrDefault(a, null))
|
||||
.filter(Objects::nonNull).collect(Collectors.toList());
|
||||
return ORDERED_VALID_WARM_ACTIONS.stream().map(a -> actions.getOrDefault(a, null))
|
||||
.filter(Objects::nonNull).collect(toList());
|
||||
case COLD_PHASE:
|
||||
return ORDERED_VALID_COLD_ACTIONS.stream().map(a -> actions.getOrDefault(a, null))
|
||||
.filter(Objects::nonNull).collect(Collectors.toList());
|
||||
.filter(Objects::nonNull).collect(toList());
|
||||
case FROZEN_PHASE:
|
||||
return ORDERED_VALID_FROZEN_ACTIONS.stream().map(a -> actions.getOrDefault(a, null))
|
||||
.filter(Objects::nonNull).collect(Collectors.toList());
|
||||
case DELETE_PHASE:
|
||||
return ORDERED_VALID_DELETE_ACTIONS.stream().map(a -> actions.getOrDefault(a, null))
|
||||
.filter(Objects::nonNull).collect(Collectors.toList());
|
||||
.filter(Objects::nonNull).collect(toList());
|
||||
default:
|
||||
throw new IllegalArgumentException("lifecycle type[" + TYPE + "] does not support phase[" + phase.getName() + "]");
|
||||
}
|
||||
|
@ -183,7 +207,7 @@ public class TimeseriesLifecycleType implements LifecycleType {
|
|||
int index = orderedActionNames.indexOf(currentActionName);
|
||||
if (index < 0) {
|
||||
throw new IllegalArgumentException("[" + currentActionName + "] is not a valid action for phase [" + phase.getName()
|
||||
+ "] in lifecycle type [" + TYPE + "]");
|
||||
+ "] in lifecycle type [" + TYPE + "]");
|
||||
} else {
|
||||
// Find the next action after `index` that exists in the phase and return it
|
||||
while (++index < orderedActionNames.size()) {
|
||||
|
@ -208,7 +232,7 @@ public class TimeseriesLifecycleType implements LifecycleType {
|
|||
phase.getActions().forEach((actionName, action) -> {
|
||||
if (ALLOWED_ACTIONS.get(phase.getName()).contains(actionName) == false) {
|
||||
throw new IllegalArgumentException("invalid action [" + actionName + "] " +
|
||||
"defined in phase [" + phase.getName() +"]");
|
||||
"defined in phase [" + phase.getName() + "]");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
@ -226,5 +250,24 @@ public class TimeseriesLifecycleType implements LifecycleType {
|
|||
"] action may not be used in the [" + HOT_PHASE +
|
||||
"] phase without an accompanying [" + RolloverAction.NAME + "] action");
|
||||
}
|
||||
|
||||
// look for phases that have the migrate action enabled and also specify allocation rules via the AllocateAction
|
||||
String phasesWithConflictingMigrationActions = phases.stream()
|
||||
.filter(phase -> phase.getActions().containsKey(MigrateAction.NAME) &&
|
||||
((MigrateAction) phase.getActions().get(MigrateAction.NAME)).isEnabled() &&
|
||||
phase.getActions().containsKey(AllocateAction.NAME) &&
|
||||
definesAllocationRules((AllocateAction) phase.getActions().get(AllocateAction.NAME))
|
||||
)
|
||||
.map(Phase::getName)
|
||||
.collect(Collectors.joining(","));
|
||||
if (Strings.hasText(phasesWithConflictingMigrationActions)) {
|
||||
throw new IllegalArgumentException("phases [" + phasesWithConflictingMigrationActions + "] specify an enabled " +
|
||||
MigrateAction.NAME + " action and an " + AllocateAction.NAME + " action with allocation rules. specify only a single " +
|
||||
"data migration in each phase");
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean definesAllocationRules(AllocateAction action) {
|
||||
return action.getRequire().isEmpty() == false || action.getInclude().isEmpty() == false || action.getExclude().isEmpty() == false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,120 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.ilm.step.info;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Represents the state of an index's shards allocation, including a user friendly message describing the current state.
|
||||
* It allows to transfer the allocation information to {@link org.elasticsearch.common.xcontent.XContent} using
|
||||
* {@link #toXContent(XContentBuilder, Params)}
|
||||
*/
|
||||
public class AllocationInfo implements ToXContentObject {
|
||||
|
||||
private final long numberOfReplicas;
|
||||
private final long numberShardsLeftToAllocate;
|
||||
private final boolean allShardsActive;
|
||||
private final String message;
|
||||
|
||||
static final ParseField NUMBER_OF_REPLICAS = new ParseField("number_of_replicas");
|
||||
static final ParseField SHARDS_TO_ALLOCATE = new ParseField("shards_left_to_allocate");
|
||||
static final ParseField ALL_SHARDS_ACTIVE = new ParseField("all_shards_active");
|
||||
static final ParseField MESSAGE = new ParseField("message");
|
||||
static final ConstructingObjectParser<AllocationInfo, Void> PARSER = new ConstructingObjectParser<>("allocation_routed_step_info",
|
||||
a -> new AllocationInfo((long) a[0], (long) a[1], (boolean) a[2], (String) a[3]));
|
||||
|
||||
static {
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_REPLICAS);
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), SHARDS_TO_ALLOCATE);
|
||||
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ALL_SHARDS_ACTIVE);
|
||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), MESSAGE);
|
||||
}
|
||||
|
||||
public AllocationInfo(long numberOfReplicas, long numberShardsLeftToAllocate, boolean allShardsActive, String message) {
|
||||
this.numberOfReplicas = numberOfReplicas;
|
||||
this.numberShardsLeftToAllocate = numberShardsLeftToAllocate;
|
||||
this.allShardsActive = allShardsActive;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds the AllocationInfo representing a cluster state with a routing table that does not have enough shards active for a
|
||||
* particular index.
|
||||
*/
|
||||
public static AllocationInfo waitingForActiveShardsAllocationInfo(long numReplicas) {
|
||||
return new AllocationInfo(numReplicas, -1, false,
|
||||
"Waiting for all shard copies to be active");
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds the AllocationInfo representing a cluster state with a routing table that has all the shards active for a particular index
|
||||
* but there are still {@link #numberShardsLeftToAllocate} left to be allocated.
|
||||
*/
|
||||
public static AllocationInfo allShardsActiveAllocationInfo(long numReplicas, long numberShardsLeftToAllocate) {
|
||||
return new AllocationInfo(numReplicas, numberShardsLeftToAllocate, true, "Waiting for [" + numberShardsLeftToAllocate +
|
||||
"] shards to be allocated to nodes matching the given filters");
|
||||
}
|
||||
|
||||
public long getNumberOfReplicas() {
|
||||
return numberOfReplicas;
|
||||
}
|
||||
|
||||
public long getNumberShardsLeftToAllocate() {
|
||||
return numberShardsLeftToAllocate;
|
||||
}
|
||||
|
||||
public boolean allShardsActive() {
|
||||
return allShardsActive;
|
||||
}
|
||||
|
||||
public String getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(MESSAGE.getPreferredName(), message);
|
||||
builder.field(SHARDS_TO_ALLOCATE.getPreferredName(), numberShardsLeftToAllocate);
|
||||
builder.field(ALL_SHARDS_ACTIVE.getPreferredName(), allShardsActive);
|
||||
builder.field(NUMBER_OF_REPLICAS.getPreferredName(), numberOfReplicas);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(numberOfReplicas, numberShardsLeftToAllocate, allShardsActive);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
AllocationInfo other = (AllocationInfo) obj;
|
||||
return Objects.equals(numberOfReplicas, other.numberOfReplicas) &&
|
||||
Objects.equals(numberShardsLeftToAllocate, other.numberShardsLeftToAllocate) &&
|
||||
Objects.equals(message, other.message) &&
|
||||
Objects.equals(allShardsActive, other.allShardsActive);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Strings.toString(this);
|
||||
}
|
||||
}
|
|
@ -31,6 +31,9 @@ import org.elasticsearch.xpack.core.ilm.Step.StepKey;
|
|||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ilm.step.info.AllocationInfo.allShardsActiveAllocationInfo;
|
||||
import static org.elasticsearch.xpack.core.ilm.step.info.AllocationInfo.waitingForActiveShardsAllocationInfo;
|
||||
|
||||
public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRoutedStep> {
|
||||
|
||||
@Override
|
||||
|
@ -117,7 +120,7 @@ public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRo
|
|||
|
||||
AllocationRoutedStep step = new AllocationRoutedStep(randomStepKey(), randomStepKey());
|
||||
assertAllocateStatus(index, 1, 0, step, existingSettings, node1Settings, Settings.builder(), indexRoutingTable,
|
||||
new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(0, 1, true)));
|
||||
new ClusterStateWaitStep.Result(false, allShardsActiveAllocationInfo(0, 1)));
|
||||
}
|
||||
|
||||
public void testExcludeConditionMetOnlyOneCopyAllocated() {
|
||||
|
@ -139,7 +142,7 @@ public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRo
|
|||
|
||||
AllocationRoutedStep step = new AllocationRoutedStep(randomStepKey(), randomStepKey());
|
||||
assertAllocateStatus(index, 1, 0, step, existingSettings, node1Settings, Settings.builder(), indexRoutingTable,
|
||||
new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(0, 1, true)));
|
||||
new ClusterStateWaitStep.Result(false, allShardsActiveAllocationInfo(0, 1)));
|
||||
}
|
||||
|
||||
public void testIncludeConditionMetOnlyOneCopyAllocated() {
|
||||
|
@ -161,7 +164,7 @@ public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRo
|
|||
|
||||
AllocationRoutedStep step = new AllocationRoutedStep(randomStepKey(), randomStepKey());
|
||||
assertAllocateStatus(index, 1, 0, step, existingSettings, node1Settings, Settings.builder(), indexRoutingTable,
|
||||
new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(0, 1, true)));
|
||||
new ClusterStateWaitStep.Result(false, allShardsActiveAllocationInfo(0, 1)));
|
||||
}
|
||||
|
||||
public void testConditionNotMetDueToRelocation() {
|
||||
|
@ -190,7 +193,7 @@ public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRo
|
|||
|
||||
AllocationRoutedStep step = new AllocationRoutedStep(randomStepKey(), randomStepKey());
|
||||
assertAllocateStatus(index, 1, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable,
|
||||
new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(0, 2, true)));
|
||||
new ClusterStateWaitStep.Result(false, allShardsActiveAllocationInfo(0, 2)));
|
||||
}
|
||||
|
||||
public void testExecuteAllocateNotComplete() throws Exception {
|
||||
|
@ -227,7 +230,7 @@ public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRo
|
|||
|
||||
AllocationRoutedStep step = createRandomInstance();
|
||||
assertAllocateStatus(index, 2, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable,
|
||||
new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(0, 1, true)));
|
||||
new ClusterStateWaitStep.Result(false, allShardsActiveAllocationInfo(0, 1)));
|
||||
}
|
||||
|
||||
public void testExecuteAllocateNotCompleteOnlyOneCopyAllocated() throws Exception {
|
||||
|
@ -266,7 +269,7 @@ public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRo
|
|||
|
||||
AllocationRoutedStep step = new AllocationRoutedStep(randomStepKey(), randomStepKey());
|
||||
assertAllocateStatus(index, 2, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable,
|
||||
new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(0, 1, true)));
|
||||
new ClusterStateWaitStep.Result(false, allShardsActiveAllocationInfo(0, 1)));
|
||||
}
|
||||
|
||||
public void testExecuteAllocateUnassigned() throws Exception {
|
||||
|
@ -304,7 +307,7 @@ public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRo
|
|||
|
||||
AllocationRoutedStep step = createRandomInstance();
|
||||
assertAllocateStatus(index, 2, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable,
|
||||
new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(0, -1, false)));
|
||||
new ClusterStateWaitStep.Result(false, waitingForActiveShardsAllocationInfo(0)));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -343,7 +346,7 @@ public class AllocationRoutedStepTests extends AbstractStepTestCase<AllocationRo
|
|||
|
||||
AllocationRoutedStep step = createRandomInstance();
|
||||
assertAllocateStatus(index, 1, 1, step, existingSettings, node1Settings, node2Settings, indexRoutingTable,
|
||||
new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(1, -1, false)));
|
||||
new ClusterStateWaitStep.Result(false, waitingForActiveShardsAllocationInfo(1)));
|
||||
}
|
||||
|
||||
public void testExecuteIndexMissing() throws Exception {
|
||||
|
|
|
@ -0,0 +1,206 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ilm;
|
||||
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo.Reason;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.xpack.core.DataTier;
|
||||
import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep.Result;
|
||||
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
|
||||
import org.elasticsearch.xpack.core.ilm.step.info.AllocationInfo;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider.INDEX_ROUTING_INCLUDE_SETTING;
|
||||
import static org.elasticsearch.xpack.core.ilm.step.info.AllocationInfo.waitingForActiveShardsAllocationInfo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class DataTierMigrationRoutedStepTests extends AbstractStepTestCase<DataTierMigrationRoutedStep> {
|
||||
|
||||
@Override
|
||||
public DataTierMigrationRoutedStep createRandomInstance() {
|
||||
StepKey stepKey = randomStepKey();
|
||||
StepKey nextStepKey = randomStepKey();
|
||||
|
||||
return new DataTierMigrationRoutedStep(stepKey, nextStepKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataTierMigrationRoutedStep mutateInstance(DataTierMigrationRoutedStep instance) {
|
||||
StepKey key = instance.getKey();
|
||||
StepKey nextKey = instance.getNextStepKey();
|
||||
|
||||
switch (between(0, 1)) {
|
||||
case 0:
|
||||
key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||
break;
|
||||
case 1:
|
||||
nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("Illegal randomisation branch");
|
||||
}
|
||||
|
||||
return new DataTierMigrationRoutedStep(key, nextKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataTierMigrationRoutedStep copyInstance(DataTierMigrationRoutedStep instance) {
|
||||
return new DataTierMigrationRoutedStep(instance.getKey(), instance.getNextStepKey());
|
||||
}
|
||||
|
||||
public void testExecuteWithUnassignedShard() {
|
||||
IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLengthBetween(5, 10)).settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1).numberOfReplicas(1).build();
|
||||
Index index = indexMetadata.getIndex();
|
||||
IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index)
|
||||
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED))
|
||||
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 1), null, null, true, ShardRoutingState.UNASSIGNED,
|
||||
new UnassignedInfo(randomFrom(Reason.values()), "the shard is intentionally unassigned")));
|
||||
|
||||
ClusterState clusterState =
|
||||
ClusterState.builder(ClusterState.EMPTY_STATE).metadata(Metadata.builder().put(indexMetadata, true).build())
|
||||
.nodes(DiscoveryNodes.builder()
|
||||
.add(newNode("node1", Collections.singleton(DataTier.DATA_HOT_NODE_ROLE)))
|
||||
)
|
||||
.routingTable(RoutingTable.builder().add(indexRoutingTable).build())
|
||||
.build();
|
||||
DataTierMigrationRoutedStep step = createRandomInstance();
|
||||
Result expectedResult = new Result(false, waitingForActiveShardsAllocationInfo(1));
|
||||
|
||||
Result actualResult = step.isConditionMet(index, clusterState);
|
||||
assertThat(actualResult.isComplete(), is(false));
|
||||
assertThat(actualResult.getInfomationContext(), is(expectedResult.getInfomationContext()));
|
||||
}
|
||||
|
||||
public void testExecuteWithPendingShards() {
|
||||
IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLengthBetween(5, 10))
|
||||
.settings(settings(Version.CURRENT).put(INDEX_ROUTING_INCLUDE_SETTING.getKey(), DataTier.DATA_WARM))
|
||||
.numberOfShards(1).numberOfReplicas(0).build();
|
||||
Index index = indexMetadata.getIndex();
|
||||
IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index)
|
||||
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED));
|
||||
|
||||
ClusterState clusterState =
|
||||
ClusterState.builder(ClusterState.EMPTY_STATE).metadata(Metadata.builder().put(indexMetadata, true).build())
|
||||
.nodes(DiscoveryNodes.builder()
|
||||
.add(newNode("node1", Collections.singleton(DataTier.DATA_HOT_NODE_ROLE)))
|
||||
.add(newNode("node2", Collections.singleton(DataTier.DATA_WARM_NODE_ROLE)))
|
||||
)
|
||||
.routingTable(RoutingTable.builder().add(indexRoutingTable).build())
|
||||
.build();
|
||||
DataTierMigrationRoutedStep step = createRandomInstance();
|
||||
Result expectedResult = new Result(false, new AllocationInfo(0, 1, true,
|
||||
"[" + index.getName() + "] lifecycle action [" + step.getKey().getAction() + "] waiting for " +
|
||||
"[1] shards to be moved to the [data_warm] tier")
|
||||
);
|
||||
|
||||
Result actualResult = step.isConditionMet(index, clusterState);
|
||||
assertThat(actualResult.isComplete(), is(false));
|
||||
assertThat(actualResult.getInfomationContext(), is(expectedResult.getInfomationContext()));
|
||||
}
|
||||
|
||||
public void testExecuteWithPendingShardsAndTargetRoleNotPresentInCluster() {
|
||||
IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLengthBetween(5, 10))
|
||||
.settings(settings(Version.CURRENT).put(INDEX_ROUTING_INCLUDE_SETTING.getKey(), DataTier.DATA_WARM))
|
||||
.numberOfShards(1).numberOfReplicas(0).build();
|
||||
Index index = indexMetadata.getIndex();
|
||||
IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index)
|
||||
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED));
|
||||
|
||||
ClusterState clusterState =
|
||||
ClusterState.builder(ClusterState.EMPTY_STATE).metadata(Metadata.builder().put(indexMetadata, true).build())
|
||||
.nodes(DiscoveryNodes.builder()
|
||||
.add(newNode("node1", Collections.singleton(DataTier.DATA_HOT_NODE_ROLE)))
|
||||
)
|
||||
.routingTable(RoutingTable.builder().add(indexRoutingTable).build())
|
||||
.build();
|
||||
DataTierMigrationRoutedStep step = createRandomInstance();
|
||||
Result expectedResult = new Result(false, new AllocationInfo(0, 1, true,
|
||||
"[" + index.getName() + "] lifecycle action [" + step.getKey().getAction() + "] waiting for " +
|
||||
"[1] shards to be moved to the [data_warm] tier but there are currently no [data_warm] nodes in the cluster")
|
||||
);
|
||||
|
||||
Result actualResult = step.isConditionMet(index, clusterState);
|
||||
assertThat(actualResult.isComplete(), is(false));
|
||||
assertThat(actualResult.getInfomationContext(), is(expectedResult.getInfomationContext()));
|
||||
}
|
||||
|
||||
public void testExecuteIndexMissing() {
|
||||
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
|
||||
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).build();
|
||||
|
||||
DataTierMigrationRoutedStep step = createRandomInstance();
|
||||
|
||||
Result actualResult = step.isConditionMet(index, clusterState);
|
||||
assertThat(actualResult.isComplete(), is(false));
|
||||
assertThat(actualResult.getInfomationContext(), is(nullValue()));
|
||||
}
|
||||
|
||||
public void testExecuteIsComplete() {
|
||||
IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLengthBetween(5, 10))
|
||||
.settings(settings(Version.CURRENT).put(INDEX_ROUTING_INCLUDE_SETTING.getKey(), DataTier.DATA_WARM))
|
||||
.numberOfShards(1).numberOfReplicas(0).build();
|
||||
Index index = indexMetadata.getIndex();
|
||||
IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index)
|
||||
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node2", true, ShardRoutingState.STARTED));
|
||||
|
||||
ClusterState clusterState =
|
||||
ClusterState.builder(ClusterState.EMPTY_STATE).metadata(Metadata.builder().put(indexMetadata, true).build())
|
||||
.nodes(DiscoveryNodes.builder()
|
||||
.add(newNode("node1", Collections.singleton(DataTier.DATA_HOT_NODE_ROLE)))
|
||||
.add(newNode("node2", Collections.singleton(DataTier.DATA_WARM_NODE_ROLE)))
|
||||
)
|
||||
.routingTable(RoutingTable.builder().add(indexRoutingTable).build())
|
||||
.build();
|
||||
DataTierMigrationRoutedStep step = createRandomInstance();
|
||||
Result result = step.isConditionMet(index, clusterState);
|
||||
assertThat(result.isComplete(), is(true));
|
||||
assertThat(result.getInfomationContext(), is(nullValue()));
|
||||
}
|
||||
|
||||
public void testExecuteWithGenericDataNodes() {
|
||||
IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLengthBetween(5, 10))
|
||||
.settings(settings(Version.CURRENT).put(INDEX_ROUTING_INCLUDE_SETTING.getKey(), DataTier.DATA_WARM))
|
||||
.numberOfShards(1).numberOfReplicas(0).build();
|
||||
Index index = indexMetadata.getIndex();
|
||||
IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index)
|
||||
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED));
|
||||
|
||||
ClusterState clusterState =
|
||||
ClusterState.builder(ClusterState.EMPTY_STATE).metadata(Metadata.builder().put(indexMetadata, true).build())
|
||||
.nodes(DiscoveryNodes.builder()
|
||||
.add(newNode("node1", Collections.singleton(DiscoveryNodeRole.DATA_ROLE)))
|
||||
)
|
||||
.routingTable(RoutingTable.builder().add(indexRoutingTable).build())
|
||||
.build();
|
||||
DataTierMigrationRoutedStep step = createRandomInstance();
|
||||
Result result = step.isConditionMet(index, clusterState);
|
||||
assertThat(result.isComplete(), is(true));
|
||||
assertThat(result.getInfomationContext(), is(nullValue()));
|
||||
}
|
||||
|
||||
private DiscoveryNode newNode(String nodeId, Set<DiscoveryNodeRole> roles) {
|
||||
return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), roles, Version.CURRENT);
|
||||
}
|
||||
}
|
|
@ -48,6 +48,7 @@ public class LifecyclePolicyMetadataTests extends AbstractSerializingTestCase<Li
|
|||
new NamedWriteableRegistry.Entry(LifecycleAction.class, ShrinkAction.NAME, ShrinkAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, MigrateAction.NAME, MigrateAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new)
|
||||
));
|
||||
}
|
||||
|
@ -70,6 +71,7 @@ public class LifecyclePolicyMetadataTests extends AbstractSerializingTestCase<Li
|
|||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(MigrateAction.NAME), MigrateAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse)
|
||||
));
|
||||
return new NamedXContentRegistry(entries);
|
||||
|
|
|
@ -57,6 +57,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, MigrateAction.NAME, MigrateAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SearchableSnapshotAction.NAME, SearchableSnapshotAction::new)
|
||||
));
|
||||
}
|
||||
|
@ -78,6 +79,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(MigrateAction.NAME), MigrateAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SearchableSnapshotAction.NAME),
|
||||
SearchableSnapshotAction::parse)
|
||||
));
|
||||
|
@ -136,6 +138,8 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
return new UnfollowAction();
|
||||
case SearchableSnapshotAction.NAME:
|
||||
return new SearchableSnapshotAction(randomAlphaOfLengthBetween(1, 10));
|
||||
case MigrateAction.NAME:
|
||||
return new MigrateAction(false);
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid action [" + action + "]");
|
||||
}};
|
||||
|
@ -194,6 +198,8 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
return new UnfollowAction();
|
||||
case SearchableSnapshotAction.NAME:
|
||||
return new SearchableSnapshotAction(randomAlphaOfLengthBetween(1, 10));
|
||||
case MigrateAction.NAME:
|
||||
return new MigrateAction(false);
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid action [" + action + "]");
|
||||
}};
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ilm;
|
||||
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.DELETE_PHASE;
|
||||
|
||||
public class MigrateActionTests extends AbstractActionTestCase<MigrateAction> {
|
||||
|
||||
@Override
|
||||
protected MigrateAction doParseInstance(XContentParser parser) throws IOException {
|
||||
return MigrateAction.parse(parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MigrateAction createTestInstance() {
|
||||
return new MigrateAction();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Reader<MigrateAction> instanceReader() {
|
||||
return MigrateAction::new;
|
||||
}
|
||||
|
||||
public void testToSteps() {
|
||||
String phase = randomValueOtherThan(DELETE_PHASE, () -> randomFrom(TimeseriesLifecycleType.VALID_PHASES));
|
||||
StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
|
||||
randomAlphaOfLengthBetween(1, 10));
|
||||
{
|
||||
MigrateAction action = new MigrateAction();
|
||||
List<Step> steps = action.toSteps(null, phase, nextStepKey);
|
||||
assertNotNull(steps);
|
||||
assertEquals(2, steps.size());
|
||||
StepKey expectedFirstStepKey = new StepKey(phase, MigrateAction.NAME, MigrateAction.NAME);
|
||||
StepKey expectedSecondStepKey = new StepKey(phase, MigrateAction.NAME, DataTierMigrationRoutedStep.NAME);
|
||||
UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(0);
|
||||
DataTierMigrationRoutedStep secondStep = (DataTierMigrationRoutedStep) steps.get(1);
|
||||
assertEquals(expectedFirstStepKey, firstStep.getKey());
|
||||
assertEquals(expectedSecondStepKey, firstStep.getNextStepKey());
|
||||
assertEquals(expectedSecondStepKey, secondStep.getKey());
|
||||
assertEquals(nextStepKey, secondStep.getNextStepKey());
|
||||
}
|
||||
|
||||
{
|
||||
MigrateAction disabledMigrateAction = new MigrateAction(false);
|
||||
List<Step> steps = disabledMigrateAction.toSteps(null, phase, nextStepKey);
|
||||
assertEquals(0, steps.size());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,6 +20,8 @@ import java.util.function.Consumer;
|
|||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.COLD_PHASE;
|
||||
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.HOT_PHASE;
|
||||
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.ORDERED_VALID_COLD_ACTIONS;
|
||||
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.ORDERED_VALID_DELETE_ACTIONS;
|
||||
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.ORDERED_VALID_FROZEN_ACTIONS;
|
||||
|
@ -31,8 +33,11 @@ import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.VALID_FRO
|
|||
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.VALID_HOT_ACTIONS;
|
||||
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.VALID_PHASES;
|
||||
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.VALID_WARM_ACTIONS;
|
||||
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.WARM_PHASE;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
||||
|
||||
|
@ -48,6 +53,9 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
|||
private static final SetPriorityAction TEST_PRIORITY_ACTION = new SetPriorityAction(0);
|
||||
private static final UnfollowAction TEST_UNFOLLOW_ACTION = new UnfollowAction();
|
||||
private static final SearchableSnapshotAction TEST_SEARCHABLE_SNAPSHOT_ACTION = new SearchableSnapshotAction("repo");
|
||||
// keeping the migrate action disabled as otherwise it could conflict with the allocate action if both are randomly selected for the
|
||||
// same phase
|
||||
private static final MigrateAction TEST_MIGRATE_ACTION = new MigrateAction(false);
|
||||
|
||||
public void testValidatePhases() {
|
||||
boolean invalid = randomBoolean();
|
||||
|
@ -186,6 +194,28 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testValidateConflictingDataMigrationConfigurations() {
|
||||
Map<String, LifecycleAction> actions = new HashMap<>();
|
||||
actions.put(TEST_MIGRATE_ACTION.getWriteableName(), new MigrateAction(true));
|
||||
actions.put(TEST_ALLOCATE_ACTION.getWriteableName(), TEST_ALLOCATE_ACTION);
|
||||
List<Phase> phases = org.elasticsearch.common.collect.List.of(
|
||||
new Phase(WARM_PHASE, TimeValue.ZERO, actions), new Phase(COLD_PHASE, TimeValue.ZERO, actions)
|
||||
);
|
||||
|
||||
Exception validationException = expectThrows(IllegalArgumentException.class,
|
||||
() -> TimeseriesLifecycleType.INSTANCE.validate(phases));
|
||||
assertThat(validationException.getMessage(), equalTo("phases [warm,cold] specify an enabled migrate action and an allocate " +
|
||||
"action with allocation rules. specify only a single data migration in each phase"));
|
||||
|
||||
// disabling the migrate action makes the phases definition valid as only the allocate action will perform data migration
|
||||
actions.put(TEST_MIGRATE_ACTION.getWriteableName(), new MigrateAction(false));
|
||||
try {
|
||||
TimeseriesLifecycleType.INSTANCE.validate(phases);
|
||||
} catch (Exception e) {
|
||||
fail("not expecting a failure for phases that specify one action that migrates data" + e);
|
||||
}
|
||||
}
|
||||
|
||||
public void testGetOrderedPhases() {
|
||||
Map<String, Phase> phaseMap = new HashMap<>();
|
||||
for (String phaseName : randomSubsetOf(randomIntBetween(0, VALID_PHASES.size()), VALID_PHASES)) {
|
||||
|
@ -196,6 +226,18 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
|||
assertTrue(isSorted(TimeseriesLifecycleType.INSTANCE.getOrderedPhases(phaseMap), Phase::getName, VALID_PHASES));
|
||||
}
|
||||
|
||||
public void testGetOrderedPhasesInsertsMigrateAction() {
|
||||
Map<String, Phase> phaseMap = new HashMap<>();
|
||||
phaseMap.put(HOT_PHASE, new Phase(HOT_PHASE, TimeValue.ZERO, org.elasticsearch.common.collect.Map.of()));
|
||||
phaseMap.put(WARM_PHASE, new Phase(WARM_PHASE, TimeValue.ZERO, org.elasticsearch.common.collect.Map.of()));
|
||||
|
||||
List<Phase> orderedPhases = TimeseriesLifecycleType.INSTANCE.getOrderedPhases(phaseMap);
|
||||
assertTrue(isSorted(orderedPhases, Phase::getName, VALID_PHASES));
|
||||
Phase warmPhase = orderedPhases.get(1);
|
||||
assertThat(warmPhase, is(notNullValue()));
|
||||
assertThat(warmPhase.getActions().get(MigrateAction.NAME), is(notNullValue()));
|
||||
}
|
||||
|
||||
public void testUnfollowInjections() {
|
||||
assertTrue(isUnfollowInjected("hot", RolloverAction.NAME));
|
||||
assertTrue(isUnfollowInjected("warm", ShrinkAction.NAME));
|
||||
|
@ -590,6 +632,41 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
|||
exception.getMessage());
|
||||
}
|
||||
|
||||
public void testShouldMigrateDataToTiers() {
|
||||
{
|
||||
// the allocate action contain allocation rules
|
||||
Map<String, LifecycleAction> actions = new HashMap<>();
|
||||
actions.put(TEST_MIGRATE_ACTION.getWriteableName(), new MigrateAction(false));
|
||||
actions.put(TEST_ALLOCATE_ACTION.getWriteableName(), TEST_ALLOCATE_ACTION);
|
||||
Phase phase = new Phase(WARM_PHASE, TimeValue.ZERO, actions);
|
||||
assertThat(TimeseriesLifecycleType.shouldInjectMigrateStepForPhase(phase), is(false));
|
||||
}
|
||||
|
||||
{
|
||||
// the allocate action only specifies the number of replicas
|
||||
Map<String, LifecycleAction> actions = new HashMap<>();
|
||||
actions.put(TEST_ALLOCATE_ACTION.getWriteableName(), new AllocateAction(2, null, null, null));
|
||||
Phase phase = new Phase(WARM_PHASE, TimeValue.ZERO, actions);
|
||||
assertThat(TimeseriesLifecycleType.shouldInjectMigrateStepForPhase(phase), is(true));
|
||||
}
|
||||
|
||||
{
|
||||
// there's an enabled migrate action specified
|
||||
Map<String, LifecycleAction> actions = new HashMap<>();
|
||||
actions.put(TEST_MIGRATE_ACTION.getWriteableName(), new MigrateAction(true));
|
||||
Phase phase = new Phase(WARM_PHASE, TimeValue.ZERO, actions);
|
||||
assertThat(TimeseriesLifecycleType.shouldInjectMigrateStepForPhase(phase), is(false));
|
||||
}
|
||||
|
||||
{
|
||||
// there's a disabled migrate action specified
|
||||
Map<String, LifecycleAction> actions = new HashMap<>();
|
||||
actions.put(TEST_MIGRATE_ACTION.getWriteableName(), new MigrateAction(false));
|
||||
Phase phase = new Phase(WARM_PHASE, TimeValue.ZERO, actions);
|
||||
assertThat(TimeseriesLifecycleType.shouldInjectMigrateStepForPhase(phase), is(false));
|
||||
}
|
||||
}
|
||||
|
||||
private void assertNextActionName(String phaseName, String currentAction, String expectedNextAction, String... availableActionNames) {
|
||||
Map<String, LifecycleAction> availableActions = convertActionNamesToActions(availableActionNames);
|
||||
Phase phase = new Phase(phaseName, TimeValue.ZERO, availableActions);
|
||||
|
@ -626,7 +703,9 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
|||
case SetPriorityAction.NAME:
|
||||
return new SetPriorityAction(0);
|
||||
case UnfollowAction.NAME:
|
||||
return new UnfollowAction();
|
||||
return new UnfollowAction();
|
||||
case MigrateAction.NAME:
|
||||
return new MigrateAction(true);
|
||||
}
|
||||
return new DeleteAction();
|
||||
}).collect(Collectors.toConcurrentMap(LifecycleAction::getWriteableName, Function.identity()));
|
||||
|
@ -698,6 +777,8 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
|||
return TEST_UNFOLLOW_ACTION;
|
||||
case SearchableSnapshotAction.NAME:
|
||||
return TEST_SEARCHABLE_SNAPSHOT_ACTION;
|
||||
case MigrateAction.NAME:
|
||||
return TEST_MIGRATE_ACTION;
|
||||
default:
|
||||
throw new IllegalArgumentException("unsupported timeseries phase action [" + actionName + "]");
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.elasticsearch.xpack.core.ilm.LifecycleAction;
|
|||
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyTests;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecycleType;
|
||||
import org.elasticsearch.xpack.core.ilm.MigrateAction;
|
||||
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
|
||||
import org.elasticsearch.xpack.core.ilm.RolloverAction;
|
||||
import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
|
||||
|
@ -75,6 +76,7 @@ public class PutLifecycleRequestTests extends AbstractSerializingTestCase<Reques
|
|||
new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, MigrateAction.NAME, MigrateAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SearchableSnapshotAction.NAME, SearchableSnapshotAction::new)
|
||||
));
|
||||
}
|
||||
|
@ -97,6 +99,7 @@ public class PutLifecycleRequestTests extends AbstractSerializingTestCase<Reques
|
|||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SearchableSnapshotAction.NAME),
|
||||
SearchableSnapshotAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(MigrateAction.NAME), MigrateAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse)
|
||||
));
|
||||
return new NamedXContentRegistry(entries);
|
||||
|
|
|
@ -4,25 +4,24 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.ilm;
|
||||
package org.elasticsearch.xpack.core.ilm.step.info;
|
||||
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractXContentTestCase;
|
||||
import org.elasticsearch.test.EqualsHashCodeTestUtils;
|
||||
import org.elasticsearch.xpack.core.ilm.AllocationRoutedStep.Info;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class AllocationRoutedStepInfoTests extends AbstractXContentTestCase<AllocationRoutedStep.Info> {
|
||||
public class AllocationRoutedStepInfoTests extends AbstractXContentTestCase<AllocationInfo> {
|
||||
|
||||
@Override
|
||||
protected Info createTestInstance() {
|
||||
return new Info(randomNonNegativeLong(), randomNonNegativeLong(), randomBoolean());
|
||||
protected AllocationInfo createTestInstance() {
|
||||
return new AllocationInfo(randomNonNegativeLong(), randomNonNegativeLong(), randomBoolean(), randomAlphaOfLengthBetween(5, 10));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Info doParseInstance(XContentParser parser) throws IOException {
|
||||
return Info.PARSER.apply(parser, null);
|
||||
protected AllocationInfo doParseInstance(XContentParser parser) throws IOException {
|
||||
return AllocationInfo.PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -36,14 +35,16 @@ public class AllocationRoutedStepInfoTests extends AbstractXContentTestCase<Allo
|
|||
}
|
||||
}
|
||||
|
||||
protected final Info copyInstance(Info instance) throws IOException {
|
||||
return new Info(instance.getActualReplicas(), instance.getNumberShardsLeftToAllocate(), instance.allShardsActive());
|
||||
protected final AllocationInfo copyInstance(AllocationInfo instance) {
|
||||
return new AllocationInfo(instance.getNumberOfReplicas(), instance.getNumberShardsLeftToAllocate(), instance.allShardsActive(),
|
||||
instance.getMessage());
|
||||
}
|
||||
|
||||
protected Info mutateInstance(Info instance) throws IOException {
|
||||
long actualReplicas = instance.getActualReplicas();
|
||||
protected AllocationInfo mutateInstance(AllocationInfo instance) throws IOException {
|
||||
long actualReplicas = instance.getNumberOfReplicas();
|
||||
long shardsToAllocate = instance.getNumberShardsLeftToAllocate();
|
||||
boolean allShardsActive = instance.allShardsActive();
|
||||
String message = instance.getMessage();
|
||||
switch (between(0, 2)) {
|
||||
case 0:
|
||||
shardsToAllocate += between(1, 20);
|
||||
|
@ -54,10 +55,13 @@ public class AllocationRoutedStepInfoTests extends AbstractXContentTestCase<Allo
|
|||
case 2:
|
||||
actualReplicas += between(1, 20);
|
||||
break;
|
||||
case 3:
|
||||
message = randomValueOtherThan(message, () -> randomAlphaOfLengthBetween(5, 10));
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("Illegal randomisation branch");
|
||||
}
|
||||
return new Info(actualReplicas, shardsToAllocate, allShardsActive);
|
||||
return new AllocationInfo(actualReplicas, shardsToAllocate, allShardsActive, message);
|
||||
}
|
||||
|
||||
}
|
|
@ -36,6 +36,7 @@ import org.elasticsearch.xpack.core.ilm.InitializePolicyContextStep;
|
|||
import org.elasticsearch.xpack.core.ilm.LifecycleAction;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.ilm.MigrateAction;
|
||||
import org.elasticsearch.xpack.core.ilm.Phase;
|
||||
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
|
||||
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
|
||||
|
@ -675,8 +676,21 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
request.addParameter("level", "shards");
|
||||
});
|
||||
|
||||
// assign the policy that'll attempt to shrink the index
|
||||
createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(expectedFinalShards));
|
||||
// assign the policy that'll attempt to shrink the index (disabling the migrate action as it'll otherwise wait for
|
||||
// all shards to be active and we want that to happen as part of the shrink action)
|
||||
MigrateAction migrateAction = new MigrateAction(false);
|
||||
ShrinkAction shrinkAction = new ShrinkAction(expectedFinalShards);
|
||||
Phase phase = new Phase("warm", TimeValue.ZERO, org.elasticsearch.common.collect.Map.of(
|
||||
migrateAction.getWriteableName(), migrateAction, shrinkAction.getWriteableName(), shrinkAction)
|
||||
);
|
||||
LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policy, singletonMap(phase.getName(), phase));
|
||||
XContentBuilder builder = jsonBuilder();
|
||||
lifecyclePolicy.toXContent(builder, null);
|
||||
final StringEntity entity = new StringEntity(
|
||||
"{ \"policy\":" + Strings.toString(builder) + "}", ContentType.APPLICATION_JSON);
|
||||
Request putPolicyRequest = new Request("PUT", "_ilm/policy/" + policy);
|
||||
putPolicyRequest.setEntity(entity);
|
||||
client().performRequest(putPolicyRequest);
|
||||
updatePolicy(index, policy);
|
||||
|
||||
assertTrue("ILM did not start retrying the set-single-node-allocation step", waitUntil(() -> {
|
||||
|
|
|
@ -0,0 +1,156 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.ilm;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.xpack.core.DataTier;
|
||||
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
import org.elasticsearch.xpack.core.ilm.DataTierMigrationRoutedStep;
|
||||
import org.elasticsearch.xpack.core.ilm.ExplainLifecycleRequest;
|
||||
import org.elasticsearch.xpack.core.ilm.ExplainLifecycleResponse;
|
||||
import org.elasticsearch.xpack.core.ilm.IndexLifecycleExplainResponse;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.ilm.Phase;
|
||||
import org.elasticsearch.xpack.core.ilm.action.ExplainLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Locale;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
|
||||
import static org.elasticsearch.test.NodeRoles.onlyRole;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
||||
public class DataTiersMigrationsTests extends ESIntegTestCase {
|
||||
|
||||
private String policy;
|
||||
private String managedIndex;
|
||||
|
||||
@Before
|
||||
public void refreshDataStreamAndPolicy() {
|
||||
policy = "policy-" + randomAlphaOfLength(5);
|
||||
managedIndex = "index-" + randomAlphaOfLengthBetween(10, 15).toLowerCase(Locale.ROOT);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean ignoreExternalCluster() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return Arrays.asList(LocalStateCompositeXPackPlugin.class, IndexLifecycle.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal));
|
||||
settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
|
||||
settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
|
||||
settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
|
||||
settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false);
|
||||
settings.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s");
|
||||
settings.put(LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), false);
|
||||
settings.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false);
|
||||
return settings.build();
|
||||
}
|
||||
@Override
|
||||
protected Settings transportClientSettings() {
|
||||
Settings.Builder settings = Settings.builder().put(super.transportClientSettings());
|
||||
settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
|
||||
settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
|
||||
settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
|
||||
settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false);
|
||||
return settings.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
|
||||
return nodePlugins();
|
||||
}
|
||||
|
||||
public static Settings hotNode(final Settings settings) {
|
||||
return onlyRole(settings, DataTier.DATA_HOT_NODE_ROLE);
|
||||
}
|
||||
|
||||
public static Settings warmNode(final Settings settings) {
|
||||
return onlyRole(settings, DataTier.DATA_WARM_NODE_ROLE);
|
||||
}
|
||||
|
||||
public static Settings coldNode(final Settings settings) {
|
||||
return onlyRole(settings, DataTier.DATA_COLD_NODE_ROLE);
|
||||
}
|
||||
|
||||
public void testIndexDataTierMigration() throws Exception {
|
||||
internalCluster().startMasterOnlyNodes(1, Settings.EMPTY);
|
||||
logger.info("starting hot data node");
|
||||
internalCluster().startNode(hotNode(Settings.EMPTY));
|
||||
|
||||
Phase hotPhase = new Phase("hot", TimeValue.ZERO, Collections.emptyMap());
|
||||
Phase warmPhase = new Phase("warm", TimeValue.ZERO, Collections.emptyMap());
|
||||
Phase coldPhase = new Phase("cold", TimeValue.ZERO, Collections.emptyMap());
|
||||
LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(
|
||||
policy, org.elasticsearch.common.collect.Map.of("hot", hotPhase, "warm", warmPhase, "cold", coldPhase)
|
||||
);
|
||||
PutLifecycleAction.Request putLifecycleRequest = new PutLifecycleAction.Request(lifecyclePolicy);
|
||||
PutLifecycleAction.Response putLifecycleResponse = client().execute(PutLifecycleAction.INSTANCE, putLifecycleRequest).get();
|
||||
assertAcked(putLifecycleResponse);
|
||||
|
||||
Settings settings = Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(SETTING_NUMBER_OF_REPLICAS, 0).put(LifecycleSettings.LIFECYCLE_NAME, policy).build();
|
||||
CreateIndexResponse res = client().admin().indices().prepareCreate(managedIndex).setSettings(settings).get();
|
||||
assertTrue(res.isAcknowledged());
|
||||
|
||||
assertBusy(() -> {
|
||||
ExplainLifecycleRequest explainRequest = new ExplainLifecycleRequest().indices(managedIndex);
|
||||
ExplainLifecycleResponse explainResponse = client().execute(ExplainLifecycleAction.INSTANCE,
|
||||
explainRequest).get();
|
||||
|
||||
IndexLifecycleExplainResponse indexLifecycleExplainResponse = explainResponse.getIndexResponses().get(managedIndex);
|
||||
assertThat(indexLifecycleExplainResponse.getPhase(), is("warm"));
|
||||
assertThat(indexLifecycleExplainResponse.getStep(), is(DataTierMigrationRoutedStep.NAME));
|
||||
});
|
||||
|
||||
logger.info("starting warm data node");
|
||||
internalCluster().startNode(warmNode(Settings.EMPTY));
|
||||
assertBusy(() -> {
|
||||
ExplainLifecycleRequest explainRequest = new ExplainLifecycleRequest().indices(managedIndex);
|
||||
ExplainLifecycleResponse explainResponse = client().execute(ExplainLifecycleAction.INSTANCE,
|
||||
explainRequest).get();
|
||||
|
||||
IndexLifecycleExplainResponse indexLifecycleExplainResponse = explainResponse.getIndexResponses().get(managedIndex);
|
||||
assertThat(indexLifecycleExplainResponse.getPhase(), is("cold"));
|
||||
assertThat(indexLifecycleExplainResponse.getStep(), is(DataTierMigrationRoutedStep.NAME));
|
||||
});
|
||||
|
||||
logger.info("starting cold data node");
|
||||
internalCluster().startNode(coldNode(Settings.EMPTY));
|
||||
|
||||
// wait for lifecycle to complete in the cold phase after the index has been migrated to the cold node
|
||||
assertBusy(() -> {
|
||||
ExplainLifecycleRequest explainRequest = new ExplainLifecycleRequest().indices(managedIndex);
|
||||
ExplainLifecycleResponse explainResponse = client().execute(ExplainLifecycleAction.INSTANCE,
|
||||
explainRequest).get();
|
||||
|
||||
IndexLifecycleExplainResponse indexLifecycleExplainResponse = explainResponse.getIndexResponses().get(managedIndex);
|
||||
assertThat(indexLifecycleExplainResponse.getPhase(), is("cold"));
|
||||
assertThat(indexLifecycleExplainResponse.getStep(), is("complete"));
|
||||
});
|
||||
}
|
||||
}
|
|
@ -46,6 +46,7 @@ import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
|
|||
import org.elasticsearch.xpack.core.ilm.LifecycleAction;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecycleType;
|
||||
import org.elasticsearch.xpack.core.ilm.MigrateAction;
|
||||
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
|
||||
import org.elasticsearch.xpack.core.ilm.RolloverAction;
|
||||
import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
|
||||
|
@ -249,7 +250,9 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
|
|||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(WaitForSnapshotAction.NAME),
|
||||
WaitForSnapshotAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SearchableSnapshotAction.NAME),
|
||||
SearchableSnapshotAction::parse)
|
||||
SearchableSnapshotAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(MigrateAction.NAME),
|
||||
MigrateAction::parse)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.xpack.core.ilm.LifecycleAction;
|
|||
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecycleType;
|
||||
import org.elasticsearch.xpack.core.ilm.MigrateAction;
|
||||
import org.elasticsearch.xpack.core.ilm.OperationMode;
|
||||
import org.elasticsearch.xpack.core.ilm.Phase;
|
||||
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
|
||||
|
@ -91,6 +92,7 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
|
|||
new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, MigrateAction.NAME, MigrateAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SearchableSnapshotAction.NAME, SearchableSnapshotAction::new)
|
||||
));
|
||||
}
|
||||
|
@ -112,6 +114,7 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
|
|||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(MigrateAction.NAME), MigrateAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SearchableSnapshotAction.NAME),
|
||||
SearchableSnapshotAction::parse)
|
||||
));
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
|
|||
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyTests;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.ilm.MigrateAction;
|
||||
import org.elasticsearch.xpack.core.ilm.MockAction;
|
||||
import org.elasticsearch.xpack.core.ilm.MockStep;
|
||||
import org.elasticsearch.xpack.core.ilm.OperationMode;
|
||||
|
@ -171,7 +172,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
Phase phase = policy.getPhases().get(phaseName);
|
||||
PhaseExecutionInfo phaseExecutionInfo = new PhaseExecutionInfo(policy.getName(), phase, 1, randomNonNegativeLong());
|
||||
String phaseJson = Strings.toString(phaseExecutionInfo);
|
||||
LifecycleAction action = randomFrom(phase.getActions().values());
|
||||
LifecycleAction action = randomValueOtherThan(new MigrateAction(false), () -> randomFrom(phase.getActions().values()));
|
||||
Step step = randomFrom(action.toSteps(new NoOpClient(threadPool), phaseName, null));
|
||||
StepKey stepKey = step.getKey();
|
||||
|
||||
|
@ -729,7 +730,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
Phase phase = policy.getPhases().get(phaseName);
|
||||
PhaseExecutionInfo pei = new PhaseExecutionInfo(policy.getName(), phase, 1, randomNonNegativeLong());
|
||||
String phaseJson = Strings.toString(pei);
|
||||
LifecycleAction action = randomFrom(phase.getActions().values());
|
||||
LifecycleAction action = randomValueOtherThan(new MigrateAction(false), () -> randomFrom(phase.getActions().values()));
|
||||
Step step = randomFrom(action.toSteps(client, phaseName, MOCK_STEP_KEY));
|
||||
Settings indexSettings = Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
|
|||
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyTests;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.ilm.MigrateAction;
|
||||
import org.elasticsearch.xpack.core.ilm.MockStep;
|
||||
import org.elasticsearch.xpack.core.ilm.OperationMode;
|
||||
import org.elasticsearch.xpack.core.ilm.Phase;
|
||||
|
@ -91,7 +92,7 @@ public class PolicyStepsRegistryTests extends ESTestCase {
|
|||
Phase phase = policy.getPhases().get(phaseName);
|
||||
PhaseExecutionInfo pei = new PhaseExecutionInfo(policy.getName(), phase, 1, randomNonNegativeLong());
|
||||
String phaseJson = Strings.toString(pei);
|
||||
LifecycleAction action = randomFrom(phase.getActions().values());
|
||||
LifecycleAction action = randomValueOtherThan(new MigrateAction(false), () -> randomFrom(phase.getActions().values()));
|
||||
Step step = randomFrom(action.toSteps(client, phaseName, MOCK_STEP_KEY));
|
||||
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
|
||||
lifecycleState.setPhaseDefinition(phaseJson);
|
||||
|
@ -159,7 +160,7 @@ public class PolicyStepsRegistryTests extends ESTestCase {
|
|||
Phase phase = policy.getPhases().get(phaseName);
|
||||
PhaseExecutionInfo pei = new PhaseExecutionInfo(policy.getName(), phase, 1, randomNonNegativeLong());
|
||||
String phaseJson = Strings.toString(pei);
|
||||
LifecycleAction action = randomFrom(phase.getActions().values());
|
||||
LifecycleAction action = randomValueOtherThan(new MigrateAction(false), () -> randomFrom(phase.getActions().values()));
|
||||
Step step = randomFrom(action.toSteps(client, phaseName, MOCK_STEP_KEY));
|
||||
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
|
||||
lifecycleState.setPhaseDefinition(phaseJson);
|
||||
|
|
Loading…
Reference in New Issue