Adds API to assign or change the policy for an index (#31277)
* Adds API to assign or change the policy for an index This api will change `index.lifecycle.name` for all indexes in the provided pattern as long as they are not currently in the shrink action. Later changes will loosen this restriction so an index is only reject if it is currently in the shrink action AND the diff between the old and new policy includes changes to the shrink action. Also later changes will detect if the current step is not present in the new policy and move the index onto the next available step * Changes name to SetPolicyForIndexAction Also changes all related Classes and api endpoints * fixes tests
This commit is contained in:
parent
6785391a5f
commit
6c9e0aeca3
File diff suppressed because one or more lines are too long
|
@ -51,6 +51,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.ReplicasAction;
|
|||
import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.SetPolicyForIndexAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.DeleteLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.ExplainLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction;
|
||||
|
@ -330,6 +331,7 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
|
|||
GetLifecycleAction.INSTANCE,
|
||||
PutLifecycleAction.INSTANCE,
|
||||
ExplainLifecycleAction.INSTANCE,
|
||||
SetPolicyForIndexAction.INSTANCE,
|
||||
MoveToStepAction.INSTANCE,
|
||||
RetryAction.INSTANCE
|
||||
);
|
||||
|
|
|
@ -0,0 +1,197 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class SetPolicyForIndexAction extends Action<SetPolicyForIndexAction.Request, SetPolicyForIndexAction.Response> {
|
||||
public static final SetPolicyForIndexAction INSTANCE = new SetPolicyForIndexAction();
|
||||
public static final String NAME = "indices:admin/xpack/index_lifecycle/set_policy";
|
||||
|
||||
protected SetPolicyForIndexAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SetPolicyForIndexAction.Response newResponse() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
public static class Response extends ActionResponse implements ToXContentObject {
|
||||
|
||||
public static final ParseField HAS_FAILURES_FIELD = new ParseField("has_failures");
|
||||
public static final ParseField FAILED_INDEXES_FIELD = new ParseField("failed_indexes");
|
||||
@SuppressWarnings("unchecked")
|
||||
public static final ConstructingObjectParser<Response, Void> PARSER = new ConstructingObjectParser<>(
|
||||
"change_policy_for_index_response", a -> new Response((List<String>) a[0]));
|
||||
static {
|
||||
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), FAILED_INDEXES_FIELD);
|
||||
// Needs to be declared but not used in constructing the response object
|
||||
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), HAS_FAILURES_FIELD);
|
||||
}
|
||||
|
||||
private List<String> failedIndexes;
|
||||
|
||||
public Response() {
|
||||
}
|
||||
|
||||
public Response(List<String> failedIndexes) {
|
||||
if (failedIndexes == null) {
|
||||
throw new IllegalArgumentException(FAILED_INDEXES_FIELD.getPreferredName() + " cannot be null");
|
||||
}
|
||||
this.failedIndexes = failedIndexes;
|
||||
}
|
||||
|
||||
public List<String> getFailedIndexes() {
|
||||
return failedIndexes;
|
||||
}
|
||||
|
||||
public boolean hasFailures() {
|
||||
return failedIndexes.isEmpty() == false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(HAS_FAILURES_FIELD.getPreferredName(), hasFailures());
|
||||
builder.field(FAILED_INDEXES_FIELD.getPreferredName(), failedIndexes);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
failedIndexes = in.readList(StreamInput::readString);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeStringList(failedIndexes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(failedIndexes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
Response other = (Response) obj;
|
||||
return Objects.equals(failedIndexes, other.failedIndexes);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest.Replaceable {
|
||||
|
||||
private String[] indices;
|
||||
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
|
||||
private String policy;
|
||||
|
||||
public Request() {
|
||||
}
|
||||
|
||||
public Request(String policy, String... indices) {
|
||||
if (indices == null) {
|
||||
throw new IllegalArgumentException("indices cannot be null");
|
||||
}
|
||||
if (policy == null) {
|
||||
throw new IllegalArgumentException("policy cannot be null");
|
||||
}
|
||||
this.indices = indices;
|
||||
this.policy = policy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Request indices(String... indices) {
|
||||
this.indices = indices;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] indices() {
|
||||
return indices;
|
||||
}
|
||||
|
||||
public String policy() {
|
||||
return policy;
|
||||
}
|
||||
|
||||
public void indicesOptions(IndicesOptions indicesOptions) {
|
||||
this.indicesOptions = indicesOptions;
|
||||
}
|
||||
|
||||
public IndicesOptions indicesOptions() {
|
||||
return indicesOptions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
indices = in.readStringArray();
|
||||
indicesOptions = IndicesOptions.readIndicesOptions(in);
|
||||
policy = in.readString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeStringArray(indices);
|
||||
indicesOptions.writeIndicesOptions(out);
|
||||
out.writeString(policy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(Arrays.hashCode(indices), indicesOptions, policy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
Request other = (Request) obj;
|
||||
return Objects.deepEquals(indices, other.indices) &&
|
||||
Objects.equals(indicesOptions, other.indicesOptions) &&
|
||||
Objects.equals(policy, other.policy);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.SetPolicyForIndexAction.Request;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
public class SetPolicyForIndexRequestTests extends AbstractStreamableTestCase<SetPolicyForIndexAction.Request> {
|
||||
|
||||
@Override
|
||||
protected Request createTestInstance() {
|
||||
Request request = new Request(randomAlphaOfLength(20), generateRandomStringArray(20, 20, false));
|
||||
if (randomBoolean()) {
|
||||
IndicesOptions indicesOptions = IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(),
|
||||
randomBoolean(), randomBoolean(), randomBoolean());
|
||||
request.indicesOptions(indicesOptions);
|
||||
}
|
||||
return request;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Request createBlankInstance() {
|
||||
return new Request();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Request mutateInstance(Request instance) throws IOException {
|
||||
String[] indices = instance.indices();
|
||||
IndicesOptions indicesOptions = instance.indicesOptions();
|
||||
String policy = instance.policy();
|
||||
switch (between(0, 2)) {
|
||||
case 0:
|
||||
indices = randomValueOtherThanMany(i -> Arrays.equals(i, instance.indices()),
|
||||
() -> generateRandomStringArray(20, 20, false));
|
||||
break;
|
||||
case 1:
|
||||
indicesOptions = randomValueOtherThan(indicesOptions, () -> IndicesOptions.fromOptions(randomBoolean(), randomBoolean(),
|
||||
randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()));
|
||||
break;
|
||||
case 2:
|
||||
policy = randomValueOtherThan(policy, () -> randomAlphaOfLength(20));
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("Illegal randomisation branch");
|
||||
}
|
||||
Request newRequest = new Request(policy, indices);
|
||||
newRequest.indicesOptions(indicesOptions);
|
||||
return newRequest;
|
||||
}
|
||||
|
||||
public void testNullIndices() {
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||
() -> new Request(randomAlphaOfLength(20), (String[]) null));
|
||||
assertEquals("indices cannot be null", exception.getMessage());
|
||||
}
|
||||
|
||||
public void testNullPolicy() {
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||
() -> new Request(null, generateRandomStringArray(20, 20, false)));
|
||||
assertEquals("policy cannot be null", exception.getMessage());
|
||||
}
|
||||
|
||||
public void testValidate() {
|
||||
Request request = createTestInstance();
|
||||
assertNull(request.validate());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.SetPolicyForIndexAction.Response;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class SetPolicyForIndexResponseTests extends AbstractStreamableXContentTestCase<SetPolicyForIndexAction.Response> {
|
||||
|
||||
@Override
|
||||
protected Response createBlankInstance() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response createTestInstance() {
|
||||
List<String> failedIndexes = Arrays.asList(generateRandomStringArray(20, 20, false));
|
||||
return new Response(failedIndexes);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response mutateInstance(Response instance) throws IOException {
|
||||
List<String> failedIndices = randomValueOtherThan(instance.getFailedIndexes(),
|
||||
() -> Arrays.asList(generateRandomStringArray(20, 20, false)));
|
||||
return new Response(failedIndices);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response doParseInstance(XContentParser parser) throws IOException {
|
||||
return Response.PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean supportsUnknownFields() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public void testNullFailedIndices() {
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> new Response((List<String>) null));
|
||||
assertEquals("failed_indexes cannot be null", exception.getMessage());
|
||||
}
|
||||
|
||||
public void testHasFailures() {
|
||||
Response response = new Response(new ArrayList<>());
|
||||
assertFalse(response.hasFailures());
|
||||
assertEquals(Collections.emptyList(), response.getFailedIndexes());
|
||||
|
||||
int size = randomIntBetween(1, 10);
|
||||
List<String> failedIndexes = new ArrayList<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
failedIndexes.add(randomAlphaOfLength(20));
|
||||
}
|
||||
response = new Response(failedIndexes);
|
||||
assertTrue(response.hasFailures());
|
||||
assertEquals(failedIndexes, response.getFailedIndexes());
|
||||
}
|
||||
|
||||
}
|
|
@ -34,18 +34,21 @@ import org.elasticsearch.xpack.core.XPackPlugin;
|
|||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.SetPolicyForIndexAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.DeleteLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.ExplainLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.MoveToStepAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestSetPolicyForIndexAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestDeleteLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestExplainLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestGetLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestMoveToStepAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestPutLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestRetryAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.TransportSetPolicyForIndexAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.TransportDeleteLifcycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.TransportExplainLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.TransportGetLifecycleAction;
|
||||
|
@ -147,6 +150,7 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
|
|||
new RestGetLifecycleAction(settings, restController),
|
||||
new RestDeleteLifecycleAction(settings, restController),
|
||||
new RestExplainLifecycleAction(settings, restController),
|
||||
new RestSetPolicyForIndexAction(settings, restController),
|
||||
new RestMoveToStepAction(settings, restController),
|
||||
new RestRetryAction(settings, restController)
|
||||
);
|
||||
|
@ -162,6 +166,7 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
|
|||
new ActionHandler<>(GetLifecycleAction.INSTANCE, TransportGetLifecycleAction.class),
|
||||
new ActionHandler<>(DeleteLifecycleAction.INSTANCE, TransportDeleteLifcycleAction.class),
|
||||
new ActionHandler<>(ExplainLifecycleAction.INSTANCE, TransportExplainLifecycleAction.class),
|
||||
new ActionHandler<>(SetPolicyForIndexAction.INSTANCE, TransportSetPolicyForIndexAction.class),
|
||||
new ActionHandler<>(MoveToStepAction.INSTANCE, TransportMoveToStepAction.class),
|
||||
new ActionHandler<>(RetryAction.INSTANCE, TransportRetryAction.class));
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.indexlifecycle;
|
|||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.admin.indices.shrink.ShrinkAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
|
@ -25,14 +26,15 @@ import org.elasticsearch.xpack.core.indexlifecycle.AsyncWaitStep;
|
|||
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ErrorStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.InitializePolicyContextStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.function.LongSupplier;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class IndexLifecycleRunner {
|
||||
private static final Logger logger = ESLoggerFactory.getLogger(IndexLifecycleRunner.class);
|
||||
|
@ -274,4 +276,64 @@ public class IndexLifecycleRunner {
|
|||
private void setStepInfo(Index index, String policy, StepKey currentStepKey, ToXContentObject stepInfo) {
|
||||
clusterService.submitStateUpdateTask("ILM", new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo));
|
||||
}
|
||||
|
||||
public static ClusterState setPolicyForIndexes(final String newPolicyName, final Index[] indices, ClusterState currentState,
|
||||
LifecyclePolicy newPolicy, List<String> failedIndexes) {
|
||||
MetaData.Builder newMetadata = MetaData.builder(currentState.getMetaData());
|
||||
boolean clusterStateChanged = false;
|
||||
for (Index index : indices) {
|
||||
IndexMetaData indexMetadata = currentState.getMetaData().index(index);
|
||||
if (indexMetadata == null) {
|
||||
// Index doesn't exist so fail it
|
||||
failedIndexes.add(index.getName());
|
||||
} else {
|
||||
IndexMetaData.Builder newIdxMetadata = IndexLifecycleRunner.setPolicyForIndex(newPolicyName, newPolicy, failedIndexes,
|
||||
index, indexMetadata);
|
||||
if (newIdxMetadata != null) {
|
||||
newMetadata.put(newIdxMetadata);
|
||||
clusterStateChanged = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (clusterStateChanged) {
|
||||
ClusterState.Builder newClusterState = ClusterState.builder(currentState);
|
||||
newClusterState.metaData(newMetadata);
|
||||
return newClusterState.build();
|
||||
} else {
|
||||
return currentState;
|
||||
}
|
||||
}
|
||||
|
||||
private static IndexMetaData.Builder setPolicyForIndex(final String newPolicyName, LifecyclePolicy newPolicy,
|
||||
List<String> failedIndexes,
|
||||
Index index, IndexMetaData indexMetadata) {
|
||||
Settings idxSettings = indexMetadata.getSettings();
|
||||
Settings.Builder newSettings = Settings.builder().put(idxSettings);
|
||||
String currentPolicy = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxSettings);
|
||||
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(idxSettings);
|
||||
|
||||
if (canSetPolicy(currentStepKey, currentPolicy, newPolicy)) {
|
||||
newSettings.put(LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey(), newPolicyName);
|
||||
// NORELEASE check if current step exists in new policy and if not move to next available step
|
||||
return IndexMetaData.builder(indexMetadata).settings(newSettings);
|
||||
} else {
|
||||
failedIndexes.add(index.getName());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean canSetPolicy(StepKey currentStepKey, String currentPolicyName, LifecyclePolicy newPolicy) {
|
||||
if (Strings.hasLength(currentPolicyName)) {
|
||||
if (ShrinkAction.NAME.equals(currentStepKey.getAction())) {
|
||||
// Index is in the shrink action so fail it
|
||||
// NORELEASE also need to check if the shrink action has changed between oldPolicy and newPolicy
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
// Index not previously managed by ILM so safe to change policy
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.apache.lucene.util.SetOnce;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateApplier;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
|
@ -35,7 +36,7 @@ import java.util.function.LongSupplier;
|
|||
* A service which runs the {@link LifecyclePolicy}s associated with indexes.
|
||||
*/
|
||||
public class IndexLifecycleService extends AbstractComponent
|
||||
implements ClusterStateListener, SchedulerEngine.Listener, Closeable {
|
||||
implements ClusterStateListener, ClusterStateApplier, SchedulerEngine.Listener, Closeable {
|
||||
private static final Logger logger = ESLoggerFactory.getLogger(IndexLifecycleService.class);
|
||||
|
||||
private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
|
||||
|
@ -59,6 +60,7 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
this.scheduledJob = null;
|
||||
this.policyRegistry = new PolicyStepsRegistry();
|
||||
this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService, nowSupplier);
|
||||
clusterService.addStateApplier(this);
|
||||
clusterService.addListener(this);
|
||||
}
|
||||
|
||||
|
@ -98,11 +100,6 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
|
||||
boolean pollIntervalSettingChanged = !pollInterval.equals(previousPollInterval);
|
||||
|
||||
if (lifecycleMetadata != null && event.changedCustomMetaDataSet().contains(IndexLifecycleMetadata.TYPE)) {
|
||||
// update policy steps registry
|
||||
policyRegistry.update(event.state(), client, nowSupplier);
|
||||
}
|
||||
|
||||
if (lifecycleMetadata == null) { // no lifecycle metadata, install initial empty metadata state
|
||||
lifecycleMetadata = new IndexLifecycleMetadata(Collections.emptySortedMap());
|
||||
installMetadata(lifecycleMetadata);
|
||||
|
@ -122,6 +119,18 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyClusterState(ClusterChangedEvent event) {
|
||||
if (event.localNodeMaster()) { // only act if we are master, otherwise
|
||||
// keep idle until elected
|
||||
IndexLifecycleMetadata lifecycleMetadata = event.state().metaData().custom(IndexLifecycleMetadata.TYPE);
|
||||
if (lifecycleMetadata != null && event.changedCustomMetaDataSet().contains(IndexLifecycleMetadata.TYPE)) {
|
||||
// update policy steps registry
|
||||
policyRegistry.update(event.state(), client, nowSupplier);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void cancelJob() {
|
||||
if (scheduler.get() != null) {
|
||||
scheduler.get().remove(IndexLifecycle.NAME);
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.SetPolicyForIndexAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycle;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class RestSetPolicyForIndexAction extends BaseRestHandler {
|
||||
|
||||
public RestSetPolicyForIndexAction(Settings settings, RestController controller) {
|
||||
super(settings);
|
||||
controller.registerHandler(RestRequest.Method.PUT, "_" + IndexLifecycle.NAME + "/set_policy/{new_policy}", this);
|
||||
controller.registerHandler(RestRequest.Method.PUT, "{index}/_" + IndexLifecycle.NAME + "/set_policy/{new_policy}", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "xpack_set_policy_for_index_action";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
|
||||
String[] indexes = Strings.splitStringByCommaToArray(restRequest.param("index"));
|
||||
String newPolicyName = restRequest.param("new_policy");
|
||||
SetPolicyForIndexAction.Request changePolicyRequest = new SetPolicyForIndexAction.Request(newPolicyName, indexes);
|
||||
changePolicyRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", changePolicyRequest.masterNodeTimeout()));
|
||||
|
||||
return channel -> client.execute(SetPolicyForIndexAction.INSTANCE, changePolicyRequest, new RestToXContentListener<>(channel));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.SetPolicyForIndexAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.SetPolicyForIndexAction.Request;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.SetPolicyForIndexAction.Response;
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleRunner;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class TransportSetPolicyForIndexAction extends TransportMasterNodeAction<Request, Response> {
|
||||
|
||||
@Inject
|
||||
public TransportSetPolicyForIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(settings, SetPolicyForIndexAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
indexNameExpressionResolver, Request::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
|
||||
final String newPolicyName = request.policy();
|
||||
final Index[] indices = indexNameExpressionResolver.concreteIndices(state, request.indicesOptions(), request.indices());
|
||||
clusterService.submitStateUpdateTask("change-lifecycle-for-index-" + newPolicyName,
|
||||
new AckedClusterStateUpdateTask<Response>(request, listener) {
|
||||
|
||||
private final List<String> failedIndexes = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
IndexLifecycleMetadata ilmMetadata = (IndexLifecycleMetadata) currentState.metaData()
|
||||
.custom(IndexLifecycleMetadata.TYPE);
|
||||
LifecyclePolicy newPolicy = ilmMetadata.getPolicies().get(newPolicyName);
|
||||
|
||||
if (newPolicy == null) {
|
||||
throw new ResourceNotFoundException("Policy does not exist [{}]", newPolicyName);
|
||||
}
|
||||
|
||||
return IndexLifecycleRunner.setPolicyForIndexes(newPolicyName, indices, currentState, newPolicy, failedIndexes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse(boolean acknowledged) {
|
||||
return new Response(failedIndexes);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.indexlifecycle;
|
|||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.indices.shrink.ShrinkAction;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -21,11 +22,13 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.AbstractStepTestCase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.AsyncWaitStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ErrorStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.InitializePolicyContextStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyMetadata;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.MockStep;
|
||||
|
@ -33,11 +36,15 @@ import org.elasticsearch.xpack.core.indexlifecycle.RandomStepInfo;
|
|||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.TestLifecycleType;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.SortedMap;
|
||||
|
@ -792,6 +799,128 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
return ClusterState.builder(new ClusterName("my_cluster")).metaData(metadata).build();
|
||||
}
|
||||
|
||||
public void testSetPolicyForIndex() {
|
||||
long now = randomNonNegativeLong();
|
||||
String indexName = randomAlphaOfLength(10);
|
||||
String oldPolicyName = "old_policy";
|
||||
String newPolicyName = "new_policy";
|
||||
LifecyclePolicy newPolicy = new LifecyclePolicy(TestLifecycleType.INSTANCE, newPolicyName, Collections.emptyMap());
|
||||
StepKey currentStep = AbstractStepTestCase.randomStepKey();
|
||||
Settings.Builder indexSettingsBuilder = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, oldPolicyName)
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE, currentStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStep.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, currentStep.getName()).put(LifecycleSettings.LIFECYCLE_SKIP, true);
|
||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder);
|
||||
Index index = clusterState.metaData().index(indexName).getIndex();
|
||||
Index[] indices = new Index[] { index };
|
||||
List<String> failedIndexes = new ArrayList<>();
|
||||
|
||||
ClusterState newClusterState = IndexLifecycleRunner.setPolicyForIndexes(newPolicyName, indices, clusterState, newPolicy,
|
||||
failedIndexes);
|
||||
|
||||
assertTrue(failedIndexes.isEmpty());
|
||||
assertClusterStateOnPolicy(clusterState, index, newPolicyName, currentStep, currentStep, newClusterState, now);
|
||||
}
|
||||
|
||||
public void testSetPolicyForIndexNoCurrentPolicy() {
|
||||
long now = randomNonNegativeLong();
|
||||
String indexName = randomAlphaOfLength(10);
|
||||
String newPolicyName = "new_policy";
|
||||
LifecyclePolicy newPolicy = new LifecyclePolicy(TestLifecycleType.INSTANCE, newPolicyName, Collections.emptyMap());
|
||||
StepKey currentStep = new StepKey("", "", "");
|
||||
Settings.Builder indexSettingsBuilder = Settings.builder();
|
||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder);
|
||||
Index index = clusterState.metaData().index(indexName).getIndex();
|
||||
Index[] indices = new Index[] { index };
|
||||
List<String> failedIndexes = new ArrayList<>();
|
||||
|
||||
ClusterState newClusterState = IndexLifecycleRunner.setPolicyForIndexes(newPolicyName, indices, clusterState, newPolicy,
|
||||
failedIndexes);
|
||||
|
||||
assertTrue(failedIndexes.isEmpty());
|
||||
assertClusterStateOnPolicy(clusterState, index, newPolicyName, currentStep, currentStep, newClusterState, now);
|
||||
}
|
||||
|
||||
public void testSetPolicyForIndexIndexDoesntExist() {
|
||||
String indexName = randomAlphaOfLength(10);
|
||||
String oldPolicyName = "old_policy";
|
||||
String newPolicyName = "new_policy";
|
||||
LifecyclePolicy newPolicy = new LifecyclePolicy(TestLifecycleType.INSTANCE, newPolicyName, Collections.emptyMap());
|
||||
StepKey currentStep = AbstractStepTestCase.randomStepKey();
|
||||
Settings.Builder indexSettingsBuilder = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, oldPolicyName)
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE, currentStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStep.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, currentStep.getName()).put(LifecycleSettings.LIFECYCLE_SKIP, true);
|
||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder);
|
||||
Index index = new Index("doesnt_exist", "im_not_here");
|
||||
Index[] indices = new Index[] { index };
|
||||
List<String> failedIndexes = new ArrayList<>();
|
||||
|
||||
ClusterState newClusterState = IndexLifecycleRunner.setPolicyForIndexes(newPolicyName, indices, clusterState, newPolicy,
|
||||
failedIndexes);
|
||||
|
||||
assertEquals(1, failedIndexes.size());
|
||||
assertEquals("doesnt_exist", failedIndexes.get(0));
|
||||
assertSame(clusterState, newClusterState);
|
||||
}
|
||||
|
||||
public void testSetPolicyForIndexIndexInShrink() {
|
||||
String indexName = randomAlphaOfLength(10);
|
||||
String oldPolicyName = "old_policy";
|
||||
String newPolicyName = "new_policy";
|
||||
LifecyclePolicy newPolicy = new LifecyclePolicy(TestLifecycleType.INSTANCE, newPolicyName, Collections.emptyMap());
|
||||
StepKey currentStep = new StepKey(randomAlphaOfLength(10), ShrinkAction.NAME, randomAlphaOfLength(10));
|
||||
Settings.Builder indexSettingsBuilder = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, oldPolicyName)
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE, currentStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStep.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, currentStep.getName()).put(LifecycleSettings.LIFECYCLE_SKIP, true);
|
||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder);
|
||||
Index index = clusterState.metaData().index(indexName).getIndex();
|
||||
Index[] indices = new Index[] { index };
|
||||
List<String> failedIndexes = new ArrayList<>();
|
||||
|
||||
ClusterState newClusterState = IndexLifecycleRunner.setPolicyForIndexes(newPolicyName, indices, clusterState, newPolicy,
|
||||
failedIndexes);
|
||||
|
||||
assertEquals(1, failedIndexes.size());
|
||||
assertEquals(index.getName(), failedIndexes.get(0));
|
||||
assertSame(clusterState, newClusterState);
|
||||
}
|
||||
|
||||
public static void assertClusterStateOnPolicy(ClusterState oldClusterState, Index index, String expectedPolicy, StepKey previousStep,
|
||||
StepKey expectedStep, ClusterState newClusterState, long now) {
|
||||
assertNotSame(oldClusterState, newClusterState);
|
||||
MetaData newMetadata = newClusterState.metaData();
|
||||
assertNotSame(oldClusterState.metaData(), newMetadata);
|
||||
IndexMetaData newIndexMetadata = newMetadata.getIndexSafe(index);
|
||||
assertNotSame(oldClusterState.metaData().index(index), newIndexMetadata);
|
||||
Settings newIndexSettings = newIndexMetadata.getSettings();
|
||||
assertNotSame(oldClusterState.metaData().index(index).getSettings(), newIndexSettings);
|
||||
assertEquals(expectedStep.getPhase(), LifecycleSettings.LIFECYCLE_PHASE_SETTING.get(newIndexSettings));
|
||||
assertEquals(expectedStep.getAction(), LifecycleSettings.LIFECYCLE_ACTION_SETTING.get(newIndexSettings));
|
||||
assertEquals(expectedStep.getName(), LifecycleSettings.LIFECYCLE_STEP_SETTING.get(newIndexSettings));
|
||||
if (previousStep.getPhase().equals(expectedStep.getPhase())) {
|
||||
assertEquals(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(oldClusterState.metaData().index(index).getSettings()),
|
||||
LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newIndexSettings));
|
||||
} else {
|
||||
assertEquals(now, (long) LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newIndexSettings));
|
||||
}
|
||||
if (previousStep.getAction().equals(expectedStep.getAction())) {
|
||||
assertEquals(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(oldClusterState.metaData().index(index).getSettings()),
|
||||
LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newIndexSettings));
|
||||
} else {
|
||||
assertEquals(now, (long) LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newIndexSettings));
|
||||
}
|
||||
if (previousStep.getName().equals(expectedStep.getName())) {
|
||||
assertEquals(LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING.get(oldClusterState.metaData().index(index).getSettings()),
|
||||
LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING.get(newIndexSettings));
|
||||
} else {
|
||||
assertEquals(now, (long) LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING.get(newIndexSettings));
|
||||
}
|
||||
assertEquals("", LifecycleSettings.LIFECYCLE_FAILED_STEP_SETTING.get(newIndexSettings));
|
||||
assertEquals("", LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING.get(newIndexSettings));
|
||||
}
|
||||
|
||||
public static void assertClusterStateOnNextStep(ClusterState oldClusterState, Index index, StepKey currentStep, StepKey nextStep,
|
||||
ClusterState newClusterState, long now) {
|
||||
assertNotSame(oldClusterState, newClusterState);
|
||||
|
|
|
@ -45,7 +45,7 @@ import static org.mockito.Mockito.doAnswer;
|
|||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.only;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -87,6 +87,7 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
|||
indexLifecycleService = new IndexLifecycleService(Settings.EMPTY, client, clusterService, clock,
|
||||
threadPool, () -> now);
|
||||
Mockito.verify(clusterService).addListener(indexLifecycleService);
|
||||
Mockito.verify(clusterService).addStateApplier(indexLifecycleService);
|
||||
}
|
||||
|
||||
public void testOnlyChangesStateOnMaster() throws Exception {
|
||||
|
@ -100,8 +101,11 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
|||
.build();
|
||||
ClusterChangedEvent event = new ClusterChangedEvent("_source", state, state);
|
||||
|
||||
indexLifecycleService.applyClusterState(event);
|
||||
indexLifecycleService.clusterChanged(event);
|
||||
verify(clusterService, only()).addListener(any());
|
||||
verify(clusterService, times(1)).addListener(any());
|
||||
verify(clusterService, times(1)).addStateApplier(any());
|
||||
Mockito.verifyNoMoreInteractions(clusterService);
|
||||
assertNull(indexLifecycleService.getScheduler());
|
||||
}
|
||||
|
||||
|
@ -120,8 +124,11 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
|||
.build();
|
||||
ClusterChangedEvent event = new ClusterChangedEvent("_source", state, state);
|
||||
|
||||
indexLifecycleService.applyClusterState(event);
|
||||
indexLifecycleService.clusterChanged(event);
|
||||
verify(clusterService, only()).addListener(any());
|
||||
verify(clusterService, times(1)).addListener(any());
|
||||
verify(clusterService, times(1)).addStateApplier(any());
|
||||
Mockito.verifyNoMoreInteractions(clusterService);
|
||||
assertNull(indexLifecycleService.getScheduler());
|
||||
assertNull(indexLifecycleService.getScheduledJob());
|
||||
|
||||
|
@ -134,6 +141,7 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
|||
|
||||
// Check that when the node is first elected as master it sets up
|
||||
// the scheduler and job
|
||||
indexLifecycleService.applyClusterState(event);
|
||||
indexLifecycleService.clusterChanged(event);
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
assertNotNull(indexLifecycleService.getScheduler());
|
||||
|
@ -148,6 +156,7 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
|||
event = new ClusterChangedEvent("_source", state, state);
|
||||
|
||||
// Check that when the node is un-elected as master it cancels the job
|
||||
indexLifecycleService.applyClusterState(event);
|
||||
indexLifecycleService.clusterChanged(event);
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
assertNotNull(indexLifecycleService.getScheduler());
|
||||
|
@ -162,6 +171,7 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
|||
event = new ClusterChangedEvent("_source", state, state);
|
||||
|
||||
// Check that when the node is re-elected as master it cancels the job
|
||||
indexLifecycleService.applyClusterState(event);
|
||||
indexLifecycleService.clusterChanged(event);
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
assertNotNull(indexLifecycleService.getScheduler());
|
||||
|
@ -189,10 +199,13 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
|||
return null;
|
||||
}).when(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class));
|
||||
|
||||
indexLifecycleService.applyClusterState(event);
|
||||
indexLifecycleService.clusterChanged(event);
|
||||
|
||||
verify(clusterService).addListener(any());
|
||||
verify(clusterService, times(1)).addListener(any());
|
||||
verify(clusterService, times(1)).addStateApplier(any());
|
||||
verify(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class));
|
||||
Mockito.verifyNoMoreInteractions(clusterService);
|
||||
assertNull(indexLifecycleService.getScheduler());
|
||||
}
|
||||
|
||||
|
@ -219,19 +232,26 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
|||
.build();
|
||||
ClusterChangedEvent event = new ClusterChangedEvent("_source", currentState, previousState);
|
||||
|
||||
indexLifecycleService.clusterChanged(new ClusterChangedEvent("_source", previousState, previousState));
|
||||
ClusterChangedEvent noChangeEvent = new ClusterChangedEvent("_source", previousState, previousState);
|
||||
indexLifecycleService.applyClusterState(noChangeEvent);
|
||||
indexLifecycleService.clusterChanged(noChangeEvent);
|
||||
assertThat(indexLifecycleService.getScheduler().jobCount(), equalTo(1));
|
||||
assertThat(((TimeValueSchedule) indexLifecycleService.getScheduledJob().getSchedule()).getInterval(),
|
||||
equalTo(TimeValue.timeValueSeconds(3)));
|
||||
indexLifecycleService.applyClusterState(event);
|
||||
indexLifecycleService.clusterChanged(event);
|
||||
assertThat(indexLifecycleService.getScheduler().jobCount(), equalTo(1));
|
||||
assertThat(((TimeValueSchedule) indexLifecycleService.getScheduledJob().getSchedule()).getInterval(), equalTo(pollInterval));
|
||||
indexLifecycleService.clusterChanged(new ClusterChangedEvent("_source", currentState, currentState));
|
||||
noChangeEvent = new ClusterChangedEvent("_source", currentState, currentState);
|
||||
indexLifecycleService.applyClusterState(noChangeEvent);
|
||||
indexLifecycleService.clusterChanged(noChangeEvent);
|
||||
assertThat(indexLifecycleService.getScheduler().jobCount(), equalTo(1));
|
||||
assertThat(((TimeValueSchedule) indexLifecycleService.getScheduledJob().getSchedule()).getInterval(), equalTo(pollInterval));
|
||||
|
||||
verify(clusterService, only()).addListener(any());
|
||||
verify(clusterService, times(1)).addListener(any());
|
||||
verify(clusterService, times(1)).addStateApplier(any());
|
||||
verify(clusterService, never()).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class));
|
||||
Mockito.verifyNoMoreInteractions(clusterService);
|
||||
}
|
||||
|
||||
public void testInstallMetadataFail() {
|
||||
|
@ -250,8 +270,10 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
|||
Exception exception = expectThrows(RuntimeException.class, () -> indexLifecycleService.clusterChanged(event));
|
||||
assertThat(exception.getMessage(), equalTo("error"));
|
||||
|
||||
verify(clusterService).addListener(any());
|
||||
verify(clusterService, times(1)).addListener(any());
|
||||
verify(clusterService, times(1)).addStateApplier(any());
|
||||
verify(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class));
|
||||
Mockito.verifyNoMoreInteractions(clusterService);
|
||||
assertNull(indexLifecycleService.getScheduler());
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
{
|
||||
"xpack.index_lifecycle.set_policy": {
|
||||
"documentation": "http://www.elastic.co/guide/en/index_lifecycle/current/index_lifecycle.html",
|
||||
"methods": [ "PUT" ],
|
||||
"url": {
|
||||
"path": "/{index}/_index_lifecycle/set_policy/{new_policy}",
|
||||
"paths": ["/{index}/_index_lifecycle/set_policy/{new_policy}", "/_index_lifecycle/set_policy/{new_policy}"],
|
||||
"parts": {
|
||||
"index": {
|
||||
"type" : "string",
|
||||
"description" : "The name of the index to set policy on"
|
||||
},
|
||||
"new_policy": {
|
||||
"type" : "string",
|
||||
"description" : "The name of the new policy to set"
|
||||
}
|
||||
},
|
||||
"params": {
|
||||
}
|
||||
},
|
||||
"body": null
|
||||
}
|
||||
}
|
|
@ -0,0 +1,223 @@
|
|||
---
|
||||
setup:
|
||||
- do:
|
||||
cluster.health:
|
||||
wait_for_status: yellow
|
||||
- do:
|
||||
acknowlege: true
|
||||
xpack.index_lifecycle.put_lifecycle:
|
||||
lifecycle: "my_moveable_timeseries_lifecycle"
|
||||
body: |
|
||||
{
|
||||
"policy": {
|
||||
"type": "timeseries",
|
||||
"phases": {
|
||||
"warm": {
|
||||
"after": "1000s",
|
||||
"actions": {
|
||||
"forcemerge": {
|
||||
"max_num_segments": 10000
|
||||
}
|
||||
}
|
||||
},
|
||||
"hot": {
|
||||
"after": "1000s",
|
||||
"actions": { }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
xpack.index_lifecycle.get_lifecycle:
|
||||
lifecycle: "my_moveable_timeseries_lifecycle"
|
||||
|
||||
- do:
|
||||
acknowlege: true
|
||||
xpack.index_lifecycle.put_lifecycle:
|
||||
lifecycle: "my_alternative_timeseries_lifecycle"
|
||||
body: |
|
||||
{
|
||||
"policy": {
|
||||
"type": "timeseries",
|
||||
"phases": {
|
||||
"warm": {
|
||||
"after": "1000s",
|
||||
"actions": {
|
||||
"forcemerge": {
|
||||
"max_num_segments": 10000
|
||||
}
|
||||
}
|
||||
},
|
||||
"hot": {
|
||||
"after": "1000s",
|
||||
"actions": { }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
xpack.index_lifecycle.get_lifecycle:
|
||||
lifecycle: "my_alternative_timeseries_lifecycle"
|
||||
|
||||
- do:
|
||||
indices.create:
|
||||
index: my_index
|
||||
body:
|
||||
settings:
|
||||
index.lifecycle.name: "my_moveable_timeseries_lifecycle"
|
||||
|
||||
- do:
|
||||
indices.create:
|
||||
index: my_index2
|
||||
body:
|
||||
settings:
|
||||
index.lifecycle.name: "my_moveable_timeseries_lifecycle"
|
||||
|
||||
- do:
|
||||
indices.create:
|
||||
index: another_index
|
||||
body:
|
||||
settings:
|
||||
index.lifecycle.name: "my_moveable_timeseries_lifecycle"
|
||||
|
||||
- do:
|
||||
indices.create:
|
||||
index: unmanaged_index
|
||||
body:
|
||||
settings: {}
|
||||
|
||||
- do:
|
||||
indices.create:
|
||||
index: my_index_no_policy
|
||||
|
||||
---
|
||||
teardown:
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
indices.delete:
|
||||
index: my_index
|
||||
- do:
|
||||
acknowledge: true
|
||||
indices.delete:
|
||||
index: my_index2
|
||||
- do:
|
||||
acknowledge: true
|
||||
indices.delete:
|
||||
index: another_index
|
||||
- do:
|
||||
acknowledge: true
|
||||
indices.delete:
|
||||
index: unmanaged_index
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
indices.delete:
|
||||
index: my_index_no_policy
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
xpack.index_lifecycle.delete_lifecycle:
|
||||
lifecycle: "my_moveable_timeseries_lifecycle"
|
||||
|
||||
- do:
|
||||
catch: missing
|
||||
xpack.index_lifecycle.get_lifecycle:
|
||||
lifecycle: "my_moveable_timeseries_lifecycle"
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
xpack.index_lifecycle.delete_lifecycle:
|
||||
lifecycle: "my_alternative_timeseries_lifecycle"
|
||||
|
||||
- do:
|
||||
catch: missing
|
||||
xpack.index_lifecycle.get_lifecycle:
|
||||
lifecycle: "my_alternative_timeseries_lifecycle"
|
||||
|
||||
---
|
||||
"Test Set Policy Single Index":
|
||||
|
||||
- do:
|
||||
indices.get_settings:
|
||||
index: "another_index"
|
||||
|
||||
- match: { another_index.settings.index.lifecycle.name: my_moveable_timeseries_lifecycle }
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
xpack.index_lifecycle.set_policy:
|
||||
index: "another_index"
|
||||
new_policy: my_alternative_timeseries_lifecycle
|
||||
|
||||
- is_false: has_failures
|
||||
- length: { failed_indexes: 0 }
|
||||
|
||||
- do:
|
||||
indices.get_settings:
|
||||
index: "another_index"
|
||||
|
||||
- match: { another_index.settings.index.lifecycle.name: my_alternative_timeseries_lifecycle }
|
||||
|
||||
---
|
||||
"Test Set Policy Index Pattern":
|
||||
|
||||
- do:
|
||||
indices.get_settings:
|
||||
index: "my_*"
|
||||
|
||||
- match: { my_index.settings.index.lifecycle.name: my_moveable_timeseries_lifecycle }
|
||||
- match: { my_index2.settings.index.lifecycle.name: my_moveable_timeseries_lifecycle }
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
xpack.index_lifecycle.set_policy:
|
||||
index: "my_*"
|
||||
new_policy: my_alternative_timeseries_lifecycle
|
||||
|
||||
- is_false: has_failures
|
||||
- length: { failed_indexes: 0 }
|
||||
|
||||
- do:
|
||||
indices.get_settings:
|
||||
index: "my_*"
|
||||
|
||||
- match: { my_index.settings.index.lifecycle.name: my_alternative_timeseries_lifecycle }
|
||||
- match: { my_index2.settings.index.lifecycle.name: my_alternative_timeseries_lifecycle }
|
||||
|
||||
---
|
||||
"Test Set Policy Unmanaged Index":
|
||||
|
||||
- do:
|
||||
indices.get_settings:
|
||||
index: "unmanaged_index"
|
||||
|
||||
- is_false: unmanaged_index.settings.index.lifecycle.name
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
xpack.index_lifecycle.set_policy:
|
||||
index: "unmanaged_index"
|
||||
new_policy: my_alternative_timeseries_lifecycle
|
||||
|
||||
- is_false: has_failures
|
||||
- length: { failed_indexes: 0 }
|
||||
|
||||
- do:
|
||||
indices.get_settings:
|
||||
index: "unmanaged_index"
|
||||
|
||||
- match: { unmanaged_index.settings.index.lifecycle.name: my_alternative_timeseries_lifecycle }
|
||||
|
||||
---
|
||||
"Test Set Policy Index Does Not Exist":
|
||||
|
||||
- do:
|
||||
catch: missing
|
||||
xpack.index_lifecycle.set_policy:
|
||||
index: "doesnt_exist"
|
||||
new_policy: my_alternative_timeseries_lifecycle
|
Loading…
Reference in New Issue