From 228fc8c84281b8fbd40fc38ac90c6e007b9cfa82 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Thu, 5 Apr 2018 17:21:44 -0700 Subject: [PATCH] add some stuff --- .../core/indexlifecycle/AllocateAction.java | 95 +++---------------- .../indexlifecycle/AllocationRoutedStep.java | 55 +++++++++++ .../core/indexlifecycle/DeleteAction.java | 10 +- ...teAsyncActionStep.java => DeleteStep.java} | 4 +- .../indexlifecycle/EnoughShardsWaitStep.java | 22 +++++ .../core/indexlifecycle/ForceMergeAction.java | 16 +--- .../core/indexlifecycle/PhaseAfterStep.java | 7 -- .../core/indexlifecycle/SegmentCountStep.java | 31 ++++++ .../UpdateAllocationSettingsStep.java | 57 +++++++++++ .../AllocationRoutedStepTests.java | 51 ++++++++++ .../core/indexlifecycle/DeleteStepTests.java | 15 +++ .../EnoughShardsWaitStepTests.java | 87 +++++++++++++++++ .../InitializePolicyContextStepTests.java | 15 +++ .../core/indexlifecycle/MockActionTests.java | 1 - .../indexlifecycle/PhaseAfterStepTests.java | 15 +++ .../indexlifecycle/ReadOnlyStepTests.java | 15 +++ .../indexlifecycle/SegmentCountStepTests.java | 15 +++ .../UpdateAllocationSettingsStepTests.java | 75 +++++++++++++++ 18 files changed, 471 insertions(+), 115 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AllocationRoutedStep.java rename x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/{DeleteAsyncActionStep.java => DeleteStep.java} (84%) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/EnoughShardsWaitStep.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStep.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/UpdateAllocationSettingsStep.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/AllocationRoutedStepTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteStepTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/EnoughShardsWaitStepTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/InitializePolicyContextStepTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/PhaseAfterStepTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/ReadOnlyStepTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStepTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/UpdateAllocationSettingsStepTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AllocateAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AllocateAction.java index ee28a0ca1d2..040374afda0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AllocateAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AllocateAction.java @@ -6,17 +6,9 @@ package org.elasticsearch.xpack.core.indexlifecycle; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; -import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -27,8 +19,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.Index; -import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey; import java.io.IOException; import java.util.Arrays; @@ -36,7 +27,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.function.LongSupplier; public class AllocateAction implements LifecycleAction { @@ -46,6 +36,7 @@ public class AllocateAction implements LifecycleAction { public static final ParseField REQUIRE_FIELD = new ParseField("require"); private static final Logger logger = ESLoggerFactory.getLogger(AllocateAction.class); + @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, a -> new AllocateAction((Map) a[0], (Map) a[1], (Map) a[2])); @@ -59,7 +50,9 @@ public class AllocateAction implements LifecycleAction { private final Map include; private final Map exclude; private final Map require; - private AllocationDeciders allocationDeciders; + private static final AllocationDeciders ALLOCATION_DECIDERS = new AllocationDeciders(Settings.EMPTY, + Collections.singletonList(new FilterAllocationDecider(Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)))); public static AllocateAction parse(XContentParser parser) { return PARSER.apply(parser, null); @@ -86,9 +79,6 @@ public class AllocateAction implements LifecycleAction { "At least one of " + INCLUDE_FIELD.getPreferredName() + ", " + EXCLUDE_FIELD.getPreferredName() + " or " + REQUIRE_FIELD.getPreferredName() + "must contain attributes for action " + NAME); } - FilterAllocationDecider decider = new FilterAllocationDecider(Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); - this.allocationDeciders = new AllocationDeciders(Settings.EMPTY, Collections.singletonList(decider)); } @SuppressWarnings("unchecked") @@ -131,75 +121,16 @@ public class AllocateAction implements LifecycleAction { return builder; } - /** - * Inspects the existingSettings and adds any attributes that - * are missing for the given settingsPrefix to the - * newSettingsBuilder. - */ - private void addMissingAttrs(Map newAttrs, String settingPrefix, Settings existingSettings, - Settings.Builder newSettingsBuilder) { - newAttrs.entrySet().stream().filter(e -> { - String existingValue = existingSettings.get(settingPrefix + e.getKey()); - return existingValue == null || (existingValue.equals(e.getValue()) == false); - }).forEach(e -> newSettingsBuilder.put(settingPrefix + e.getKey(), e.getValue())); - } - -// public static ConditionalWaitStep getAllocationCheck(AllocationDeciders allocationDeciders, String phase, String index) { -// return new ConditionalWaitStep("wait_allocation", NAME, -// phase, index, (clusterState) -> { -// // We only want to make progress if all shards are active so check that first -// if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index) == false) { -// logger.debug("[{}] lifecycle action for index [{}] cannot make progress because not all shards are active", NAME, -// index); -// return false; -// } -// -// // All the allocation attributes are already set so just need to -// // check if the allocation has happened -// RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, clusterState.getRoutingNodes(), clusterState, null, -// System.nanoTime()); -// int allocationPendingShards = 0; -// List allShards = clusterState.getRoutingTable().allShards(index); -// for (ShardRouting shardRouting : allShards) { -// assert shardRouting.active() : "Shard not active, found " + shardRouting.state() + "for shard with id: " -// + shardRouting.shardId(); -// String currentNodeId = shardRouting.currentNodeId(); -// boolean canRemainOnCurrentNode = allocationDeciders.canRemain(shardRouting, -// clusterState.getRoutingNodes().node(currentNodeId), allocation).type() == Decision.Type.YES; -// if (canRemainOnCurrentNode == false) { -// allocationPendingShards++; -// } -// } -// if (allocationPendingShards > 0) { -// logger.debug("[{}] lifecycle action for index [{}] waiting for [{}] shards " -// + "to be allocated to nodes matching the given filters", NAME, index, allocationPendingShards); -// return false; -// } else { -// logger.debug("[{}] lifecycle action for index [{}] complete", NAME, index); -// return true; -// } -// }); -// } @Override - public List toSteps(Client client, String phase, Step.StepKey nextStepKey) { -// ClusterStateUpdateStep updateAllocationSettings = new ClusterStateUpdateStep( -// "update_allocation", NAME, phase, index.getName(), (clusterState) -> { -// IndexMetaData idxMeta = clusterState.metaData().index(index); -// if (idxMeta == null) { -// return clusterState; -// } -// Settings existingSettings = idxMeta.getSettings(); -// Settings.Builder newSettings = Settings.builder(); -// addMissingAttrs(include, IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey(), existingSettings, newSettings); -// addMissingAttrs(exclude, IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey(), existingSettings, newSettings); -// addMissingAttrs(require, IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey(), existingSettings, newSettings); -// return ClusterState.builder(clusterState) -// .metaData(MetaData.builder(clusterState.metaData()) -// .updateSettings(newSettings.build(), index.getName())).build(); -// }); - - return Arrays.asList(); + public List toSteps(Client client, String phase, StepKey nextStepKey) { + StepKey enoughKey = new StepKey(phase, NAME, "enough-shards-allocated"); + StepKey allocateKey = new StepKey(phase, NAME, "update-allocation"); + StepKey allocationRoutedKey = new StepKey(phase, NAME, "check-allocation"); + UpdateAllocationSettingsStep allocateStep = new UpdateAllocationSettingsStep(allocateKey, allocationRoutedKey, + include, exclude, require); + AllocationRoutedStep routedCheckStep = new AllocationRoutedStep(allocationRoutedKey, nextStepKey, ALLOCATION_DECIDERS); + return Arrays.asList(new EnoughShardsWaitStep(enoughKey, allocateKey), allocateStep, routedCheckStep); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AllocationRoutedStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AllocationRoutedStep.java new file mode 100644 index 00000000000..aad3aadc7fd --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AllocationRoutedStep.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.index.Index; + +import java.util.List; + +public class AllocationRoutedStep extends ClusterStateWaitStep { + private static final Logger logger = ESLoggerFactory.getLogger(AllocationRoutedStep.class); + + private AllocationDeciders allocationDeciders; + + AllocationRoutedStep(StepKey key, StepKey nextStepKey, AllocationDeciders allocationDeciders) { + super(key, nextStepKey); + this.allocationDeciders = allocationDeciders; + } + + @Override + public boolean isConditionMet(Index index, ClusterState clusterState) { + // All the allocation attributes are already set so just need to check if the allocation has happened + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, clusterState.getRoutingNodes(), clusterState, null, + System.nanoTime()); + int allocationPendingShards = 0; + List allShards = clusterState.getRoutingTable().allShards(index.getName()); + for (ShardRouting shardRouting : allShards) { + assert shardRouting.active() : "Shard not active, found " + shardRouting.state() + "for shard with id: " + + shardRouting.shardId(); + String currentNodeId = shardRouting.currentNodeId(); + boolean canRemainOnCurrentNode = allocationDeciders.canRemain(shardRouting, + clusterState.getRoutingNodes().node(currentNodeId), allocation).type() == Decision.Type.YES; + if (canRemainOnCurrentNode == false) { + allocationPendingShards++; + } + } + if (allocationPendingShards > 0) { + logger.debug("[{}] lifecycle action for index [{}] waiting for [{}] shards " + + "to be allocated to nodes matching the given filters", getKey().getAction(), index, allocationPendingShards); + return false; + } else { + logger.debug("[{}] lifecycle action for index [{}] complete", getKey().getAction(), index); + return true; + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteAction.java index 145ee76957a..9465b289688 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteAction.java @@ -6,11 +6,7 @@ package org.elasticsearch.xpack.core.indexlifecycle; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -18,14 +14,10 @@ import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.Index; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.function.LongSupplier; /** * A {@link LifecycleAction} which deletes the index. @@ -65,7 +57,7 @@ public class DeleteAction implements LifecycleAction { @Override public List toSteps(Client client, String phase, Step.StepKey nextStepKey) { Step.StepKey deleteStepKey = new Step.StepKey(phase, NAME, "delete-step"); - return Collections.singletonList(new DeleteAsyncActionStep(deleteStepKey, nextStepKey, client)); + return Collections.singletonList(new DeleteStep(deleteStepKey, nextStepKey, client)); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteAsyncActionStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteStep.java similarity index 84% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteAsyncActionStep.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteStep.java index df89af4ee3d..5181c79d885 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteAsyncActionStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteStep.java @@ -9,9 +9,9 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.index.Index; -public class DeleteAsyncActionStep extends AsyncActionStep { +public class DeleteStep extends AsyncActionStep { - public DeleteAsyncActionStep(StepKey key, StepKey nextStepKey, Client client) { + public DeleteStep(StepKey key, StepKey nextStepKey, Client client) { super(key, nextStepKey, client); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/EnoughShardsWaitStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/EnoughShardsWaitStep.java new file mode 100644 index 00000000000..9e3de80d440 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/EnoughShardsWaitStep.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.index.Index; + +public class EnoughShardsWaitStep extends ClusterStateWaitStep { + public EnoughShardsWaitStep(StepKey key, StepKey nextStepKey) { + super(key, nextStepKey); + } + + @Override + public boolean isConditionMet(Index index, ClusterState clusterState) { + // We only want to make progress if all shards are active + return ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ForceMergeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ForceMergeAction.java index 72a203964f5..e839ed09eb9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ForceMergeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ForceMergeAction.java @@ -98,13 +98,7 @@ public class ForceMergeAction implements LifecycleAction { @Override public List toSteps(Client client, String phase, Step.StepKey nextStepKey) { -// ClusterStateUpdateStep readOnlyStep = new ClusterStateUpdateStep( -// "read_only", NAME, phase, index.getName(), (currentState) -> { -// Settings readOnlySettings = Settings.builder().put(IndexMetaData.SETTING_BLOCKS_WRITE, true).build(); -// return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.metaData()) -// .updateSettings(readOnlySettings, index.getName())).build(); -// }); -// + // ClientStep segmentCount = new ClientStep<>( "segment_count", // NAME, phase, index.getName(), // client.admin().indices().prepareSegments(index.getName()), @@ -118,13 +112,7 @@ public class ForceMergeAction implements LifecycleAction { // NAME, phase, index.getName(), // client.admin().indices().prepareForceMerge(index.getName()).setMaxNumSegments(maxNumSegments), // currentState -> false, response -> RestStatus.OK.equals(response.getStatus())); -// -// ClusterStateUpdateStep readWriteStep = new ClusterStateUpdateStep( -// "read_only", NAME, phase, index.getName(), (currentState) -> { -// Settings readOnlySettings = Settings.builder().put(IndexMetaData.SETTING_BLOCKS_WRITE, false).build(); -// return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.metaData()) -// .updateSettings(readOnlySettings, index.getName())).build(); -// }); +// just forceMerge... then wait to see whether segmentCount matches condition return Arrays.asList(); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/PhaseAfterStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/PhaseAfterStep.java index b309a3a217a..f7227d5f734 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/PhaseAfterStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/PhaseAfterStep.java @@ -28,15 +28,8 @@ public class PhaseAfterStep extends ClusterStateWaitStep { @Override public boolean isConditionMet(Index index, ClusterState clusterState) { IndexMetaData indexMetaData = clusterState.metaData().index(index); - logger.warn("checking phase[" + indexMetaData.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE) + "]" - + " after[" + after + "]"); long lifecycleDate = indexMetaData.getSettings() .getAsLong(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE, -1L); - if (lifecycleDate < 0) { - // TODO(talevy): make sure this setting is set before we find ourselves here - logger.warn("index-lifecycle-setting for index" + index.getName() + "] not set"); - lifecycleDate = indexMetaData.getCreationDate(); - } return nowSupplier.getAsLong() >= lifecycleDate + after.getMillis(); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStep.java new file mode 100644 index 00000000000..78dfa986278 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStep.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.index.Index; + +import java.util.Arrays; +import java.util.stream.StreamSupport; + +public class SegmentCountStep extends AsyncWaitStep { + + private final int maxNumSegments; + + public SegmentCountStep(StepKey key, StepKey nextStepKey, Client client, int maxNumSegments) { + super(key, nextStepKey, client); + this.maxNumSegments = maxNumSegments; + } + + @Override + public void evaluateCondition(Index index, Listener listener) { + getClient().admin().indices().prepareSegments(index.getName()).execute(ActionListener.wrap(response -> { + listener.onResponse(StreamSupport.stream(response.getIndices().get(index.getName()).spliterator(), false) + .anyMatch(iss -> Arrays.stream(iss.getShards()).anyMatch(p -> p.getSegments().size() > maxNumSegments))); + }, listener::onFailure)); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/UpdateAllocationSettingsStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/UpdateAllocationSettingsStep.java new file mode 100644 index 00000000000..6e23dc90a8d --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/UpdateAllocationSettingsStep.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; + +import java.util.Map; + +public class UpdateAllocationSettingsStep extends ClusterStateActionStep { + private final Map include; + private final Map exclude; + private final Map require; + + public UpdateAllocationSettingsStep(StepKey key, StepKey nextStepKey, Map include, + Map exclude, Map require) { + super(key, nextStepKey); + this.include = include; + this.exclude = exclude; + this.require = require; + } + + @Override + public ClusterState performAction(Index index, ClusterState clusterState) { + IndexMetaData idxMeta = clusterState.metaData().index(index); + if (idxMeta == null) { + return clusterState; + } + Settings existingSettings = idxMeta.getSettings(); + Settings.Builder newSettings = Settings.builder(); + addMissingAttrs(include, IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey(), existingSettings, newSettings); + addMissingAttrs(exclude, IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey(), existingSettings, newSettings); + addMissingAttrs(require, IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey(), existingSettings, newSettings); + return ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .updateSettings(newSettings.build(), index.getName())).build(); + } + + /** + * Inspects the existingSettings and adds any attributes that + * are missing for the given settingsPrefix to the + * newSettingsBuilder. + */ + static void addMissingAttrs(Map newAttrs, String settingPrefix, Settings existingSettings, + Settings.Builder newSettingsBuilder) { + newAttrs.entrySet().stream().filter(e -> { + String existingValue = existingSettings.get(settingPrefix + e.getKey()); + return existingValue == null || (existingValue.equals(e.getValue()) == false); + }).forEach(e -> newSettingsBuilder.put(settingPrefix + e.getKey(), e.getValue())); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/AllocationRoutedStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/AllocationRoutedStepTests.java new file mode 100644 index 00000000000..fb4bf7829d3 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/AllocationRoutedStepTests.java @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.index.Index; +import org.elasticsearch.test.ESTestCase; + +public class AllocationRoutedStepTests extends ESTestCase { + + public void testCanStay() { + + } + + private void assertAllocateStatus(Index index, int shards, int replicas, AllocateAction action, Settings.Builder existingSettings, + Settings.Builder node1Settings, Settings.Builder node2Settings, + IndexRoutingTable.Builder indexRoutingTable, boolean expectComplete) { + IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()).settings(existingSettings).numberOfShards(shards) + .numberOfReplicas(replicas).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata); + + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build())) + .nodes(DiscoveryNodes.builder() + .add(DiscoveryNode.createLocal(node1Settings.build(), new TransportAddress(TransportAddress.META_ADDRESS, 9200), + "node1")) + .add(DiscoveryNode.createLocal(node2Settings.build(), new TransportAddress(TransportAddress.META_ADDRESS, 9201), + "node2"))) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, + Sets.newHashSet(FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, + FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING, + FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP_SETTING)); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteStepTests.java new file mode 100644 index 00000000000..54f70f8b688 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteStepTests.java @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + + +import org.elasticsearch.test.ESTestCase; + +public class DeleteStepTests extends ESTestCase { + + public void test() { + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/EnoughShardsWaitStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/EnoughShardsWaitStepTests.java new file mode 100644 index 00000000000..5a9fa7389cd --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/EnoughShardsWaitStepTests.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.Node; +import org.elasticsearch.test.ESTestCase; + +public class EnoughShardsWaitStepTests extends ESTestCase { + + public void testConditionMet() { + IndexMetaData indexMetadata = IndexMetaData.builder(randomAlphaOfLength(5)) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0).build(); + MetaData metaData = MetaData.builder() + .persistentSettings(settings(Version.CURRENT).build()) + .put(IndexMetaData.builder(indexMetadata)) + .build(); + Index index = indexMetadata.getIndex(); + + String nodeId = randomAlphaOfLength(10); + DiscoveryNode masterNode = DiscoveryNode.createLocal(settings(Version.CURRENT) + .put(Node.NODE_MASTER_SETTING.getKey(), true).build(), + new TransportAddress(TransportAddress.META_ADDRESS, 9300), nodeId); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metaData(metaData) + .nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build()) + .routingTable(RoutingTable.builder() + .add(IndexRoutingTable.builder(index).addShard( + TestShardRouting.newShardRouting(new ShardId(index, 0), + nodeId, true, ShardRoutingState.STARTED))) + .build()) + .build(); + + EnoughShardsWaitStep step = new EnoughShardsWaitStep(null, null); + assertTrue(step.isConditionMet(indexMetadata.getIndex(), clusterState)); + } + + public void testConditionNotMet() { + IndexMetaData indexMetadata = IndexMetaData.builder(randomAlphaOfLength(5)) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0).build(); + MetaData metaData = MetaData.builder() + .persistentSettings(settings(Version.CURRENT).build()) + .put(IndexMetaData.builder(indexMetadata)) + .build(); + Index index = indexMetadata.getIndex(); + + String nodeId = randomAlphaOfLength(10); + DiscoveryNode masterNode = DiscoveryNode.createLocal(settings(Version.CURRENT) + .put(Node.NODE_MASTER_SETTING.getKey(), true).build(), + new TransportAddress(TransportAddress.META_ADDRESS, 9300), nodeId); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metaData(metaData) + .nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build()) + .routingTable(RoutingTable.builder() + .add(IndexRoutingTable.builder(index).addShard( + TestShardRouting.newShardRouting(new ShardId(index, 0), + nodeId, true, ShardRoutingState.INITIALIZING))) + .build()) + .build(); + + EnoughShardsWaitStep step = new EnoughShardsWaitStep(null, null); + assertFalse(step.isConditionMet(indexMetadata.getIndex(), clusterState)); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/InitializePolicyContextStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/InitializePolicyContextStepTests.java new file mode 100644 index 00000000000..55b0afe1243 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/InitializePolicyContextStepTests.java @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + + +import org.elasticsearch.test.ESTestCase; + +public class InitializePolicyContextStepTests extends ESTestCase { + + public void test() { + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/MockActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/MockActionTests.java index c45dff3c863..5ed7b5d26d2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/MockActionTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/MockActionTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.core.indexlifecycle; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/PhaseAfterStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/PhaseAfterStepTests.java new file mode 100644 index 00000000000..dc1d33139d7 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/PhaseAfterStepTests.java @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + + +import org.elasticsearch.test.ESTestCase; + +public class PhaseAfterStepTests extends ESTestCase { + + public void test() { + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/ReadOnlyStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/ReadOnlyStepTests.java new file mode 100644 index 00000000000..40a73983ed7 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/ReadOnlyStepTests.java @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + + +import org.elasticsearch.test.ESTestCase; + +public class ReadOnlyStepTests extends ESTestCase { + + public void test() { + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStepTests.java new file mode 100644 index 00000000000..931457208c8 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStepTests.java @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + + +import org.elasticsearch.test.ESTestCase; + +public class SegmentCountStepTests extends ESTestCase { + + public void test() { + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/UpdateAllocationSettingsStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/UpdateAllocationSettingsStepTests.java new file mode 100644 index 00000000000..10a9d84f9e1 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/UpdateAllocationSettingsStepTests.java @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.test.ESTestCase; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; + +public class UpdateAllocationSettingsStepTests extends ESTestCase { + + public void testModify() { + IndexMetaData indexMetadata = IndexMetaData.builder(randomAlphaOfLength(5)) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0).build(); + MetaData metaData = MetaData.builder() + .persistentSettings(settings(Version.CURRENT).build()) + .put(IndexMetaData.builder(indexMetadata)) + .build(); + Index index = indexMetadata.getIndex(); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build(); + + Map include = + randomBoolean() ? Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)) : Collections.emptyMap(); + Map exclude = + randomBoolean() ? Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)) : Collections.emptyMap(); + Map require = + randomBoolean() ? Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)) : Collections.emptyMap(); + + UpdateAllocationSettingsStep step = new UpdateAllocationSettingsStep(null, null, include, exclude, require); + ClusterState newState = step.performAction(index, clusterState); + assertThat(getRouting(index, newState, IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey()), equalTo(include)); + assertThat(getRouting(index, newState, IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey()), equalTo(exclude)); + assertThat(getRouting(index, newState, IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey()), equalTo(require)); + } + + public void testAddMissingAttr() { + String prefix = randomFrom(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey(), + IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey(), + IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey()); + Map newAttrs = Collections.singletonMap(randomAlphaOfLength(4), randomAlphaOfLength(5)); + Settings existingSettings = Settings.builder() + .put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "box_type", "foo") + .put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + "box_type", "bar") + .put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "box_type", "baz").build(); + Settings.Builder newSettingsBuilder = Settings.builder(); + UpdateAllocationSettingsStep.addMissingAttrs(newAttrs, prefix, existingSettings, newSettingsBuilder); + + Settings.Builder expectedSettingsBuilder = Settings.builder(); + newAttrs.forEach((k, v) -> expectedSettingsBuilder.put(prefix + k, v)); + assertThat(newSettingsBuilder.build(), equalTo(expectedSettingsBuilder.build())); + } + + private Map getRouting(Index index, ClusterState clusterState, String settingPrefix) { + Settings includeSettings = clusterState.metaData().index(index).getSettings() + .getByPrefix(settingPrefix); + return includeSettings.keySet().stream().collect(Collectors.toMap(Function.identity(), includeSettings::get)); + } +}