diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AbstractUnfollowIndexStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AbstractUnfollowIndexStep.java index 8e0626425b4..570c4fdf267 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AbstractUnfollowIndexStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AbstractUnfollowIndexStep.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.indexlifecycle; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.metadata.IndexMetaData; import java.util.Map; @@ -20,7 +21,8 @@ abstract class AbstractUnfollowIndexStep extends AsyncActionStep { } @Override - public final void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, Listener listener) { + public final void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, + ClusterStateObserver observer, Listener listener) { String followerIndex = indexMetaData.getIndex().getName(); Map customIndexMetadata = indexMetaData.getCustomData(CCR_METADATA_KEY); if (customIndexMetadata == null) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncActionStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncActionStep.java index 700c8171645..3195d2d5257 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncActionStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncActionStep.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.indexlifecycle; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.metadata.IndexMetaData; /** @@ -29,7 +30,8 @@ public abstract class AsyncActionStep extends Step { return true; } - public abstract void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, Listener listener); + public abstract void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, + ClusterStateObserver observer, Listener listener); public interface Listener { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncRetryDuringSnapshotActionStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncRetryDuringSnapshotActionStep.java new file mode 100644 index 00000000000..85c91a795f0 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncRetryDuringSnapshotActionStep.java @@ -0,0 +1,165 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotInProgressException; + +import java.util.function.Consumer; + +/** + * This is an abstract AsyncActionStep that wraps the performed action listener, checking to see + * if the action fails due to a snapshot being in progress. If a snapshot is in progress, it + * registers an observer and waits to try again when a snapshot is no longer running. + */ +public abstract class AsyncRetryDuringSnapshotActionStep extends AsyncActionStep { + private final Logger logger = LogManager.getLogger(AsyncRetryDuringSnapshotActionStep.class); + + public AsyncRetryDuringSnapshotActionStep(StepKey key, StepKey nextStepKey, Client client) { + super(key, nextStepKey, client); + } + + @Override + public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, + ClusterStateObserver observer, Listener listener) { + // Wrap the original listener to handle exceptions caused by ongoing snapshots + SnapshotExceptionListener snapshotExceptionListener = new SnapshotExceptionListener(indexMetaData.getIndex(), listener, observer); + performDuringNoSnapshot(indexMetaData, currentClusterState, snapshotExceptionListener); + } + + /** + * Method to be performed during which no snapshots for the index are already underway. + */ + abstract void performDuringNoSnapshot(IndexMetaData indexMetaData, ClusterState currentClusterState, Listener listener); + + /** + * SnapshotExceptionListener is an injected listener wrapper that checks to see if a particular + * action failed due to a {@code SnapshotInProgressException}. If it did, then it registers a + * ClusterStateObserver listener waiting for the next time the snapshot is not running, + * re-running the step's {@link #performAction(IndexMetaData, ClusterState, ClusterStateObserver, Listener)} + * method when the snapshot is no longer running. + */ + class SnapshotExceptionListener implements AsyncActionStep.Listener { + private final Index index; + private final Listener originalListener; + private final ClusterStateObserver observer; + + SnapshotExceptionListener(Index index, Listener originalListener, ClusterStateObserver observer) { + this.index = index; + this.originalListener = originalListener; + this.observer = observer; + } + + @Override + public void onResponse(boolean complete) { + originalListener.onResponse(complete); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof SnapshotInProgressException) { + try { + logger.debug("[{}] attempted to run ILM step but a snapshot is in progress, step will retry at a later time", + index.getName()); + observer.waitForNextChange( + new NoSnapshotRunningListener(observer, index.getName(), state -> { + IndexMetaData idxMeta = state.metaData().index(index); + if (idxMeta == null) { + // The index has since been deleted, mission accomplished! + originalListener.onResponse(true); + } + // Re-invoke the performAction method with the new state + performAction(idxMeta, state, observer, originalListener); + }, originalListener::onFailure), + // TODO: what is a good timeout value for no new state received during this time? + TimeValue.timeValueHours(12)); + } catch (Exception secondError) { + // There was a second error trying to set up an observer, + // fail the original listener + secondError.addSuppressed(e); + originalListener.onFailure(secondError); + } + } else { + originalListener.onFailure(e); + } + } + } + + /** + * A {@link ClusterStateObserver.Listener} that invokes the given function with the new state, + * once no snapshots are running. If a snapshot is still running it registers a new listener + * and tries again. Passes any exceptions to the original exception listener if they occur. + */ + class NoSnapshotRunningListener implements ClusterStateObserver.Listener { + + private final Consumer reRun; + private final Consumer exceptionConsumer; + private final ClusterStateObserver observer; + private final String indexName; + + NoSnapshotRunningListener(ClusterStateObserver observer, String indexName, + Consumer reRun, + Consumer exceptionConsumer) { + this.observer = observer; + this.reRun = reRun; + this.exceptionConsumer = exceptionConsumer; + this.indexName = indexName; + } + + @Override + public void onNewClusterState(ClusterState state) { + try { + if (snapshotInProgress(state)) { + observer.waitForNextChange(this); + } else { + logger.debug("[{}] retrying ILM step after snapshot has completed", indexName); + reRun.accept(state); + } + } catch (Exception e) { + exceptionConsumer.accept(e); + } + } + + private boolean snapshotInProgress(ClusterState state) { + SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE); + if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) { + // No snapshots are running, new state is acceptable to proceed + return false; + } + + for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) { + if (snapshot.indices().stream() + .map(IndexId::getName) + .anyMatch(name -> name.equals(indexName))) { + // There is a snapshot running with this index name + return true; + } + } + // There are snapshots, but none for this index, so it's okay to proceed with this state + return false; + } + + @Override + public void onClusterServiceClose() { + // This means the cluster is being shut down, so nothing to do here + } + + @Override + public void onTimeout(TimeValue timeout) { + exceptionConsumer.accept(new IllegalStateException("step timed out while waiting for snapshots to complete")); + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStep.java index 3fb6e145236..149bb6dfef3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStep.java @@ -8,8 +8,14 @@ package org.elasticsearch.xpack.core.indexlifecycle; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; -final class CloseFollowerIndexStep extends AbstractUnfollowIndexStep { +import java.util.Map; + +import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY; + +final class CloseFollowerIndexStep extends AsyncRetryDuringSnapshotActionStep { static final String NAME = "close-follower-index"; @@ -18,7 +24,14 @@ final class CloseFollowerIndexStep extends AbstractUnfollowIndexStep { } @Override - void innerPerformAction(String followerIndex, Listener listener) { + void performDuringNoSnapshot(IndexMetaData indexMetaData, ClusterState currentClusterState, Listener listener) { + String followerIndex = indexMetaData.getIndex().getName(); + Map customIndexMetadata = indexMetaData.getCustomData(CCR_METADATA_KEY); + if (customIndexMetadata == null) { + listener.onResponse(true); + return; + } + CloseIndexRequest closeIndexRequest = new CloseIndexRequest(followerIndex); getClient().admin().indices().close(closeIndexRequest, ActionListener.wrap( r -> { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteStep.java index f34da641a0c..62a5b7bea4b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteStep.java @@ -14,7 +14,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; /** * Deletes a single index. */ -public class DeleteStep extends AsyncActionStep { +public class DeleteStep extends AsyncRetryDuringSnapshotActionStep { public static final String NAME = "delete"; public DeleteStep(StepKey key, StepKey nextStepKey, Client client) { @@ -22,10 +22,10 @@ public class DeleteStep extends AsyncActionStep { } @Override - public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) { + public void performDuringNoSnapshot(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) { getClient().admin().indices() .delete(new DeleteIndexRequest(indexMetaData.getIndex().getName()), - ActionListener.wrap(response -> listener.onResponse(true) , listener::onFailure)); + ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure)); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ForceMergeStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ForceMergeStep.java index 799959ce69f..871a95419a4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ForceMergeStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ForceMergeStep.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.metadata.IndexMetaData; import java.util.Objects; @@ -30,7 +31,7 @@ public class ForceMergeStep extends AsyncActionStep { } @Override - public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) { + public void performAction(IndexMetaData indexMetaData, ClusterState currentState, ClusterStateObserver observer, Listener listener) { ForceMergeRequest request = new ForceMergeRequest(indexMetaData.getIndex().getName()); request.maxNumSegments(maxNumSegments); getClient().admin().indices() diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/FreezeStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/FreezeStep.java index 523aad10a48..ae7b0af6222 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/FreezeStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/FreezeStep.java @@ -14,7 +14,7 @@ import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction; /** * Freezes an index. */ -public class FreezeStep extends AsyncActionStep { +public class FreezeStep extends AsyncRetryDuringSnapshotActionStep { public static final String NAME = "freeze"; public FreezeStep(StepKey key, StepKey nextStepKey, Client client) { @@ -22,7 +22,7 @@ public class FreezeStep extends AsyncActionStep { } @Override - public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) { + public void performDuringNoSnapshot(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) { getClient().admin().indices().execute(TransportFreezeIndexAction.FreezeIndexAction.INSTANCE, new TransportFreezeIndexAction.FreezeRequest(indexMetaData.getIndex().getName()), ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure)); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/OpenFollowerIndexStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/OpenFollowerIndexStep.java index 7ba2c4633ab..58ede7200cc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/OpenFollowerIndexStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/OpenFollowerIndexStep.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.metadata.IndexMetaData; final class OpenFollowerIndexStep extends AsyncActionStep { @@ -20,7 +21,8 @@ final class OpenFollowerIndexStep extends AsyncActionStep { } @Override - public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, Listener listener) { + public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, + ClusterStateObserver observer, Listener listener) { OpenIndexRequest request = new OpenIndexRequest(indexMetaData.getIndex().getName()); getClient().admin().indices().open(request, ActionListener.wrap( r -> { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStep.java index f501c27d8c4..91b2bbe8264 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStep.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; @@ -30,7 +31,8 @@ public class RolloverStep extends AsyncActionStep { } @Override - public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, Listener listener) { + public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, + ClusterStateObserver observer, Listener listener) { boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(indexMetaData.getSettings()); if (indexingComplete) { logger.trace(indexMetaData.getIndex() + " has lifecycle complete set, skipping " + RolloverStep.NAME); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/SetSingleNodeAllocateStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/SetSingleNodeAllocateStep.java index 973dd7f0ba0..7973289b968 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/SetSingleNodeAllocateStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/SetSingleNodeAllocateStep.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNode; @@ -42,7 +43,7 @@ public class SetSingleNodeAllocateStep extends AsyncActionStep { } @Override - public void performAction(IndexMetaData indexMetaData, ClusterState clusterState, Listener listener) { + public void performAction(IndexMetaData indexMetaData, ClusterState clusterState, ClusterStateObserver observer, Listener listener) { RoutingAllocation allocation = new RoutingAllocation(ALLOCATION_DECIDERS, clusterState.getRoutingNodes(), clusterState, null, System.nanoTime()); List validNodeIds = new ArrayList<>(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkSetAliasStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkSetAliasStep.java index da59b16ded9..51497dccc2d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkSetAliasStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkSetAliasStep.java @@ -18,7 +18,7 @@ import java.util.Objects; * Following shrinking an index and deleting the original index, this step creates an alias with the same name as the original index which * points to the new shrunken index to allow clients to continue to use the original index name without being aware that it has shrunk. */ -public class ShrinkSetAliasStep extends AsyncActionStep { +public class ShrinkSetAliasStep extends AsyncRetryDuringSnapshotActionStep { public static final String NAME = "aliases"; private String shrunkIndexPrefix; @@ -32,7 +32,7 @@ public class ShrinkSetAliasStep extends AsyncActionStep { } @Override - public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) { + public void performDuringNoSnapshot(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) { // get source index String index = indexMetaData.getIndex().getName(); // get target shrink index diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkStep.java index 19ddbfe04cf..0e9f84bc3a8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkStep.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.shrink.ResizeRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; @@ -38,7 +39,7 @@ public class ShrinkStep extends AsyncActionStep { } @Override - public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) { + public void performAction(IndexMetaData indexMetaData, ClusterState currentState, ClusterStateObserver observer, Listener listener) { LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData); if (lifecycleState.getLifecycleDate() == null) { throw new IllegalStateException("source index [" + indexMetaData.getIndex().getName() + diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/UpdateSettingsStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/UpdateSettingsStep.java index c66c1427d07..9f6b3f7a84f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/UpdateSettingsStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/UpdateSettingsStep.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; @@ -28,7 +29,7 @@ public class UpdateSettingsStep extends AsyncActionStep { } @Override - public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) { + public void performAction(IndexMetaData indexMetaData, ClusterState currentState, ClusterStateObserver observer, Listener listener) { UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexMetaData.getIndex().getName()).settings(settings); getClient().admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure)); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/AbstractUnfollowIndexStepTestCase.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/AbstractUnfollowIndexStepTestCase.java index 5ceb8ca6570..7ec81b5bae2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/AbstractUnfollowIndexStepTestCase.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/AbstractUnfollowIndexStepTestCase.java @@ -53,7 +53,7 @@ public abstract class AbstractUnfollowIndexStepTestCase { - - @Override - protected CloseFollowerIndexStep newInstance(Step.StepKey key, Step.StepKey nextKey, Client client) { - return new CloseFollowerIndexStep(key, nextKey, client); - } +public class CloseFollowerIndexStepTests extends AbstractStepTestCase { public void testCloseFollowingIndex() { IndexMetaData indexMetadata = IndexMetaData.builder("follower-index") @@ -56,7 +51,7 @@ public class CloseFollowerIndexStepTests extends AbstractUnfollowIndexStepTestCa Boolean[] completed = new Boolean[1]; Exception[] failure = new Exception[1]; CloseFollowerIndexStep step = new CloseFollowerIndexStep(randomStepKey(), randomStepKey(), client); - step.performAction(indexMetadata, null, new AsyncActionStep.Listener() { + step.performAction(indexMetadata, null, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { completed[0] = complete; @@ -98,7 +93,7 @@ public class CloseFollowerIndexStepTests extends AbstractUnfollowIndexStepTestCa Boolean[] completed = new Boolean[1]; Exception[] failure = new Exception[1]; CloseFollowerIndexStep step = new CloseFollowerIndexStep(randomStepKey(), randomStepKey(), client); - step.performAction(indexMetadata, null, new AsyncActionStep.Listener() { + step.performAction(indexMetadata, null, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { completed[0] = complete; @@ -114,4 +109,30 @@ public class CloseFollowerIndexStepTests extends AbstractUnfollowIndexStepTestCa Mockito.verify(indicesClient).close(Mockito.any(), Mockito.any()); Mockito.verifyNoMoreInteractions(indicesClient); } + + @Override + protected CloseFollowerIndexStep createRandomInstance() { + Step.StepKey stepKey = randomStepKey(); + Step.StepKey nextStepKey = randomStepKey(); + return new CloseFollowerIndexStep(stepKey, nextStepKey, Mockito.mock(Client.class)); + } + + @Override + protected CloseFollowerIndexStep mutateInstance(CloseFollowerIndexStep instance) { + Step.StepKey key = instance.getKey(); + Step.StepKey nextKey = instance.getNextStepKey(); + + if (randomBoolean()) { + key = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + } else { + nextKey = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + } + + return new CloseFollowerIndexStep(key, nextKey, instance.getClient()); + } + + @Override + protected CloseFollowerIndexStep copyInstance(CloseFollowerIndexStep instance) { + return new CloseFollowerIndexStep(instance.getKey(), instance.getNextStepKey(), instance.getClient()); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteStepTests.java index c85df6de659..405a39cd590 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteStepTests.java @@ -91,7 +91,7 @@ public class DeleteStepTests extends AbstractStepTestCase { SetOnce actionCompleted = new SetOnce<>(); DeleteStep step = createRandomInstance(); - step.performAction(indexMetaData, null, new AsyncActionStep.Listener() { + step.performAction(indexMetaData, null, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { actionCompleted.set(complete); @@ -138,7 +138,7 @@ public class DeleteStepTests extends AbstractStepTestCase { SetOnce exceptionThrown = new SetOnce<>(); DeleteStep step = createRandomInstance(); - step.performAction(indexMetaData, null, new AsyncActionStep.Listener() { + step.performAction(indexMetaData, null, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { throw new AssertionError("Unexpected method call"); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/ForceMergeStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/ForceMergeStepTests.java index 9a38ddf3a26..4ce895ae349 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/ForceMergeStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/ForceMergeStepTests.java @@ -88,7 +88,7 @@ public class ForceMergeStepTests extends AbstractStepTestCase { ForceMergeStep step = new ForceMergeStep(stepKey, nextStepKey, client, maxNumSegments); SetOnce completed = new SetOnce<>(); - step.performAction(indexMetaData, null, new AsyncActionStep.Listener() { + step.performAction(indexMetaData, null, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { completed.set(complete); @@ -129,7 +129,7 @@ public class ForceMergeStepTests extends AbstractStepTestCase { ForceMergeStep step = new ForceMergeStep(stepKey, nextStepKey, client, maxNumSegments); SetOnce exceptionThrown = new SetOnce<>(); - step.performAction(indexMetaData, null, new AsyncActionStep.Listener() { + step.performAction(indexMetaData, null, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { throw new AssertionError("unexpected method call"); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/FreezeStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/FreezeStepTests.java index 94ca2c2635c..0198ed7abee 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/FreezeStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/FreezeStepTests.java @@ -92,7 +92,7 @@ public class FreezeStepTests extends AbstractStepTestCase { SetOnce actionCompleted = new SetOnce<>(); FreezeStep step = createRandomInstance(); - step.performAction(indexMetaData, null, new AsyncActionStep.Listener() { + step.performAction(indexMetaData, null, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { actionCompleted.set(complete); @@ -135,7 +135,7 @@ public class FreezeStepTests extends AbstractStepTestCase { SetOnce exceptionThrown = new SetOnce<>(); FreezeStep step = createRandomInstance(); - step.performAction(indexMetaData, null, new AsyncActionStep.Listener() { + step.performAction(indexMetaData, null, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { throw new AssertionError("Unexpected method call"); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/OpenFollowerIndexStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/OpenFollowerIndexStepTests.java index 2d5086ec88f..027b5c811ce 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/OpenFollowerIndexStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/OpenFollowerIndexStepTests.java @@ -77,7 +77,7 @@ public class OpenFollowerIndexStepTests extends AbstractStepTestCase { }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any()); SetOnce actionCompleted = new SetOnce<>(); - step.performAction(indexMetaData, null, new AsyncActionStep.Listener() { + step.performAction(indexMetaData, null, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { @@ -138,7 +138,7 @@ public class RolloverStepTests extends AbstractStepTestCase { RolloverStep step = createRandomInstance(); SetOnce actionCompleted = new SetOnce<>(); - step.performAction(indexMetaData, null, new AsyncActionStep.Listener() { + step.performAction(indexMetaData, null, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { @@ -183,7 +183,7 @@ public class RolloverStepTests extends AbstractStepTestCase { }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any()); SetOnce exceptionThrown = new SetOnce<>(); - step.performAction(indexMetaData, null, new AsyncActionStep.Listener() { + step.performAction(indexMetaData, null, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { @@ -212,7 +212,7 @@ public class RolloverStepTests extends AbstractStepTestCase { RolloverStep step = createRandomInstance(); SetOnce exceptionThrown = new SetOnce<>(); - step.performAction(indexMetaData, null, new AsyncActionStep.Listener() { + step.performAction(indexMetaData, null, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { throw new AssertionError("Unexpected method call"); @@ -237,7 +237,7 @@ public class RolloverStepTests extends AbstractStepTestCase { RolloverStep step = createRandomInstance(); SetOnce exceptionThrown = new SetOnce<>(); - step.performAction(indexMetaData, null, new AsyncActionStep.Listener() { + step.performAction(indexMetaData, null, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { throw new AssertionError("Unexpected method call"); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/SetSingleNodeAllocateStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/SetSingleNodeAllocateStepTests.java index b42ada6956f..525744d68af 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/SetSingleNodeAllocateStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/SetSingleNodeAllocateStepTests.java @@ -262,7 +262,7 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase exceptionThrown = new SetOnce<>(); - step.performAction(indexMetaData, clusterState, new Listener() { + step.performAction(indexMetaData, clusterState, null, new Listener() { @Override public void onResponse(boolean complete) { @@ -320,7 +320,7 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase exceptionThrown = new SetOnce<>(); - step.performAction(indexMetaData, clusterState, new Listener() { + step.performAction(indexMetaData, clusterState, null, new Listener() { @Override public void onResponse(boolean complete) { @@ -375,7 +375,7 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase actionCompleted = new SetOnce<>(); - step.performAction(indexMetaData, clusterState, new Listener() { + step.performAction(indexMetaData, clusterState, null, new Listener() { @Override public void onResponse(boolean complete) { @@ -407,7 +407,7 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase actionCompleted = new SetOnce<>(); - step.performAction(indexMetaData, clusterState, new Listener() { + step.performAction(indexMetaData, clusterState, null, new Listener() { @Override public void onResponse(boolean complete) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkSetAliasStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkSetAliasStepTests.java index a5c0e4d7146..d1911000f67 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkSetAliasStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkSetAliasStepTests.java @@ -119,7 +119,7 @@ public class ShrinkSetAliasStepTests extends AbstractStepTestCase actionCompleted = new SetOnce<>(); - step.performAction(indexMetaData, null, new Listener() { + step.performAction(indexMetaData, null, null, new Listener() { @Override public void onResponse(boolean complete) { @@ -163,7 +163,7 @@ public class ShrinkSetAliasStepTests extends AbstractStepTestCase exceptionThrown = new SetOnce<>(); - step.performAction(indexMetaData, null, new Listener() { + step.performAction(indexMetaData, null, null, new Listener() { @Override public void onResponse(boolean complete) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkStepTests.java index 0cd655cb9d6..e9ceccc4e27 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkStepTests.java @@ -127,7 +127,7 @@ public class ShrinkStepTests extends AbstractStepTestCase { }).when(indicesClient).resizeIndex(Mockito.any(), Mockito.any()); SetOnce actionCompleted = new SetOnce<>(); - step.performAction(sourceIndexMetaData, null, new Listener() { + step.performAction(sourceIndexMetaData, null, null, new Listener() { @Override public void onResponse(boolean complete) { @@ -173,7 +173,7 @@ public class ShrinkStepTests extends AbstractStepTestCase { }).when(indicesClient).resizeIndex(Mockito.any(), Mockito.any()); SetOnce actionCompleted = new SetOnce<>(); - step.performAction(indexMetaData, null, new Listener() { + step.performAction(indexMetaData, null, null, new Listener() { @Override public void onResponse(boolean complete) { @@ -220,7 +220,7 @@ public class ShrinkStepTests extends AbstractStepTestCase { }).when(indicesClient).resizeIndex(Mockito.any(), Mockito.any()); SetOnce exceptionThrown = new SetOnce<>(); - step.performAction(indexMetaData, null, new Listener() { + step.performAction(indexMetaData, null, null, new Listener() { @Override public void onResponse(boolean complete) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/UnfollowFollowIndexStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/UnfollowFollowIndexStepTests.java index 58558c92d25..e92f1dce6a4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/UnfollowFollowIndexStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/UnfollowFollowIndexStepTests.java @@ -56,7 +56,7 @@ public class UnfollowFollowIndexStepTests extends AbstractUnfollowIndexStepTestC Boolean[] completed = new Boolean[1]; Exception[] failure = new Exception[1]; UnfollowFollowIndexStep step = new UnfollowFollowIndexStep(randomStepKey(), randomStepKey(), client); - step.performAction(indexMetadata, null, new AsyncActionStep.Listener() { + step.performAction(indexMetadata, null, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { completed[0] = complete; @@ -98,7 +98,7 @@ public class UnfollowFollowIndexStepTests extends AbstractUnfollowIndexStepTestC Boolean[] completed = new Boolean[1]; Exception[] failure = new Exception[1]; UnfollowFollowIndexStep step = new UnfollowFollowIndexStep(randomStepKey(), randomStepKey(), client); - step.performAction(indexMetadata, null, new AsyncActionStep.Listener() { + step.performAction(indexMetadata, null, null, new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { completed[0] = complete; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/UpdateSettingsStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/UpdateSettingsStepTests.java index 22908146af2..28683fa8629 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/UpdateSettingsStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/UpdateSettingsStepTests.java @@ -100,7 +100,7 @@ public class UpdateSettingsStepTests extends AbstractStepTestCase actionCompleted = new SetOnce<>(); - step.performAction(indexMetaData, null, new Listener() { + step.performAction(indexMetaData, null, null, new Listener() { @Override public void onResponse(boolean complete) { @@ -147,7 +147,7 @@ public class UpdateSettingsStepTests extends AbstractStepTestCase exceptionThrown = new SetOnce<>(); - step.performAction(indexMetaData, null, new Listener() { + step.performAction(indexMetaData, null, null, new Listener() { @Override public void onResponse(boolean complete) { diff --git a/x-pack/plugin/ilm/qa/multi-cluster/build.gradle b/x-pack/plugin/ilm/qa/multi-cluster/build.gradle index 59df7338929..34b7cf9e44c 100644 --- a/x-pack/plugin/ilm/qa/multi-cluster/build.gradle +++ b/x-pack/plugin/ilm/qa/multi-cluster/build.gradle @@ -4,6 +4,8 @@ apply plugin: 'elasticsearch.standalone-test' dependencies { testCompile project(':x-pack:plugin:ccr:qa') + testCompile project(':x-pack:plugin:core') + testCompile project(':x-pack:plugin:ilm') } task leaderClusterTest(type: RestIntegTestTask) { @@ -25,6 +27,8 @@ leaderClusterTestCluster { leaderClusterTestRunner { systemProperty 'tests.target_cluster', 'leader' + /* To support taking index snapshots, we have to set path.repo setting */ + systemProperty 'tests.path.repo', new File(buildDir, "cluster/shared/repo") } task followClusterTest(type: RestIntegTestTask) {} @@ -47,6 +51,8 @@ followClusterTestCluster { followClusterTestRunner { systemProperty 'tests.target_cluster', 'follow' systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}" + /* To support taking index snapshots, we have to set path.repo setting */ + systemProperty 'tests.path.repo', new File(buildDir, "cluster/shared/repo") finalizedBy 'leaderClusterTestCluster#stop' } diff --git a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java index 797916c7c40..9dbf32b3765 100644 --- a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java @@ -5,20 +5,34 @@ */ package org.elasticsearch.xpack.indexlifecycle; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.xpack.ccr.ESCCRRestTestCase; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction; +import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; +import org.elasticsearch.xpack.core.indexlifecycle.Phase; +import org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction; import java.io.IOException; +import java.io.InputStream; +import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -89,6 +103,77 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase { } } + public void testCCRUnfollowDuringSnapshot() throws Exception { + String indexName = "unfollow-test-index"; + if ("leader".equals(targetCluster)) { + Settings indexSettings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .put("index.number_of_shards", 2) + .put("index.number_of_replicas", 0) + .build(); + createIndex(indexName, indexSettings); + ensureGreen(indexName); + } else if ("follow".equals(targetCluster)) { + createNewSingletonPolicy("unfollow-only", "hot", new UnfollowAction(), TimeValue.ZERO); + followIndex(indexName, indexName); + + // Create the repository before taking the snapshot. + Request request = new Request("PUT", "/_snapshot/repo"); + request.setJsonEntity(Strings + .toString(JsonXContent.contentBuilder() + .startObject() + .field("type", "fs") + .startObject("settings") + .field("compress", randomBoolean()) + .field("location", System.getProperty("tests.path.repo")) + .field("max_snapshot_bytes_per_sec", "256b") + .endObject() + .endObject())); + assertOK(client().performRequest(request)); + + try (RestClient leaderClient = buildLeaderClient()) { + index(leaderClient, indexName, "1"); + assertDocumentExists(leaderClient, indexName, "1"); + + updateIndexSettings(leaderClient, indexName, Settings.builder() + .put("index.lifecycle.indexing_complete", true) + .build()); + + // start snapshot + request = new Request("PUT", "/_snapshot/repo/snapshot"); + request.addParameter("wait_for_completion", "false"); + request.setJsonEntity("{\"indices\": \"" + indexName + "\"}"); + assertOK(client().performRequest(request)); + + // add policy and expect it to trigger unfollow immediately (while snapshot in progress) + logger.info("--> starting unfollow"); + updatePolicy(indexName, "unfollow-only"); + + assertBusy(() -> { + // Ensure that 'index.lifecycle.indexing_complete' is replicated: + assertThat(getIndexSetting(leaderClient, indexName, "index.lifecycle.indexing_complete"), equalTo("true")); + assertThat(getIndexSetting(client(), indexName, "index.lifecycle.indexing_complete"), equalTo("true")); + // ILM should have unfollowed the follower index, so the following_index setting should have been removed: + // (this controls whether the follow engine is used) + assertThat(getIndexSetting(client(), indexName, "index.xpack.ccr.following_index"), nullValue()); + // Following index should have the document + assertDocumentExists(client(), indexName, "1"); + // ILM should have completed the unfollow + assertILMPolicy(client(), indexName, "unfollow-only", "completed"); + }, 2, TimeUnit.MINUTES); + + // assert that snapshot succeeded + assertThat(getSnapshotState("snapshot"), equalTo("SUCCESS")); + assertOK(client().performRequest(new Request("DELETE", "/_snapshot/repo/snapshot"))); + ResponseException e = expectThrows(ResponseException.class, + () -> client().performRequest(new Request("GET", "/_snapshot/repo/snapshot"))); + assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404)); + } + } else { + fail("unexpected target cluster [" + targetCluster + "]"); + } + } + public void testCcrAndIlmWithRollover() throws Exception { String alias = "metrics"; String indexName = "metrics-000001"; @@ -282,4 +367,35 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase { assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); } + private void createNewSingletonPolicy(String policyName, String phaseName, LifecycleAction action, TimeValue after) throws IOException { + Phase phase = new Phase(phaseName, after, singletonMap(action.getWriteableName(), action)); + LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policyName, singletonMap(phase.getName(), phase)); + XContentBuilder builder = jsonBuilder(); + lifecyclePolicy.toXContent(builder, null); + final StringEntity entity = new StringEntity( + "{ \"policy\":" + Strings.toString(builder) + "}", ContentType.APPLICATION_JSON); + Request request = new Request("PUT", "_ilm/policy/" + policyName); + request.setEntity(entity); + client().performRequest(request); + } + + public static void updatePolicy(String indexName, String policy) throws IOException { + + Request changePolicyRequest = new Request("PUT", "/" + indexName + "/_settings"); + final StringEntity changePolicyEntity = new StringEntity("{ \"index.lifecycle.name\": \"" + policy + "\" }", + ContentType.APPLICATION_JSON); + changePolicyRequest.setEntity(changePolicyEntity); + assertOK(client().performRequest(changePolicyRequest)); + } + + private String getSnapshotState(String snapshot) throws IOException { + Response response = client().performRequest(new Request("GET", "/_snapshot/repo/" + snapshot)); + Map responseMap; + try (InputStream is = response.getEntity().getContent()) { + responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); + } + @SuppressWarnings("unchecked") Map snapResponse = ((List>) responseMap.get("snapshots")).get(0); + assertThat(snapResponse.get("snapshot"), equalTo(snapshot)); + return (String) snapResponse.get("state"); + } } diff --git a/x-pack/plugin/ilm/qa/multi-node/build.gradle b/x-pack/plugin/ilm/qa/multi-node/build.gradle index edd7f3aad47..5f033626932 100644 --- a/x-pack/plugin/ilm/qa/multi-node/build.gradle +++ b/x-pack/plugin/ilm/qa/multi-node/build.gradle @@ -5,6 +5,11 @@ dependencies { testCompile project(path: xpackProject('plugin').path, configuration: 'testArtifacts') } +integTestRunner { + /* To support taking index snapshots, we have to set path.repo setting */ + systemProperty 'tests.path.repo', new File(buildDir, "cluster/shared/repo") +} + integTestCluster { numNodes = 4 clusterName = 'ilm' @@ -16,5 +21,4 @@ integTestCluster { setting 'xpack.ml.enabled', 'false' setting 'xpack.license.self_generated.type', 'trial' setting 'indices.lifecycle.poll_interval', '1000ms' - } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeSeriesLifecycleActionsIT.java index 01eba362711..878ddec5e5f 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeSeriesLifecycleActionsIT.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.FrozenEngine; import org.elasticsearch.test.rest.ESRestTestCase; @@ -358,6 +359,44 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase { indexDocument(); } + public void testDeleteDuringSnapshot() throws Exception { + // Create the repository before taking the snapshot. + Request request = new Request("PUT", "/_snapshot/repo"); + request.setJsonEntity(Strings + .toString(JsonXContent.contentBuilder() + .startObject() + .field("type", "fs") + .startObject("settings") + .field("compress", randomBoolean()) + .field("location", System.getProperty("tests.path.repo")) + .field("max_snapshot_bytes_per_sec", "256b") + .endObject() + .endObject())); + assertOK(client().performRequest(request)); + // create delete policy + createNewSingletonPolicy("delete", new DeleteAction(), TimeValue.timeValueMillis(0)); + // create index without policy + createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)); + // index document so snapshot actually does something + indexDocument(); + // start snapshot + request = new Request("PUT", "/_snapshot/repo/snapshot"); + request.addParameter("wait_for_completion", "false"); + request.setJsonEntity("{\"indices\": \"" + index + "\"}"); + assertOK(client().performRequest(request)); + // add policy and expect it to trigger delete immediately (while snapshot in progress) + updatePolicy(index, policy); + // assert that index was deleted + assertBusy(() -> assertFalse(indexExists(index)), 2, TimeUnit.MINUTES); + // assert that snapshot is still in progress and clean up + assertThat(getSnapshotState("snapshot"), equalTo("SUCCESS")); + assertOK(client().performRequest(new Request("DELETE", "/_snapshot/repo/snapshot"))); + ResponseException e = expectThrows(ResponseException.class, + () -> client().performRequest(new Request("GET", "/_snapshot/repo/snapshot"))); + assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404)); + } + public void testReadOnly() throws Exception { createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)); @@ -427,6 +466,56 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase { expectThrows(ResponseException.class, this::indexDocument); } + public void testShrinkDuringSnapshot() throws Exception { + String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + index; + // Create the repository before taking the snapshot. + Request request = new Request("PUT", "/_snapshot/repo"); + request.setJsonEntity(Strings + .toString(JsonXContent.contentBuilder() + .startObject() + .field("type", "fs") + .startObject("settings") + .field("compress", randomBoolean()) + .field("location", System.getProperty("tests.path.repo")) + .field("max_snapshot_bytes_per_sec", "256b") + .endObject() + .endObject())); + assertOK(client().performRequest(request)); + // create delete policy + createNewSingletonPolicy("warm", new ShrinkAction(1), TimeValue.timeValueMillis(0)); + // create index without policy + createIndexWithSettings(index, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + // required so the shrink doesn't wait on SetSingleNodeAllocateStep + .put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_name", "node-0")); + // index document so snapshot actually does something + indexDocument(); + // start snapshot + request = new Request("PUT", "/_snapshot/repo/snapshot"); + request.addParameter("wait_for_completion", "false"); + request.setJsonEntity("{\"indices\": \"" + index + "\"}"); + assertOK(client().performRequest(request)); + // add policy and expect it to trigger shrink immediately (while snapshot in progress) + updatePolicy(index, policy); + // assert that index was shrunk and original index was deleted + assertBusy(() -> { + assertTrue(indexExists(shrunkenIndex)); + assertTrue(aliasExists(shrunkenIndex, index)); + Map settings = getOnlyIndexSettings(shrunkenIndex); + assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(TerminalPolicyStep.KEY)); + assertThat(settings.get(IndexMetaData.SETTING_NUMBER_OF_SHARDS), equalTo(String.valueOf(1))); + assertThat(settings.get(IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true")); + }, 2, TimeUnit.MINUTES); + expectThrows(ResponseException.class, this::indexDocument); + // assert that snapshot succeeded + assertThat(getSnapshotState("snapshot"), equalTo("SUCCESS")); + assertOK(client().performRequest(new Request("DELETE", "/_snapshot/repo/snapshot"))); + ResponseException e = expectThrows(ResponseException.class, + () -> client().performRequest(new Request("GET", "/_snapshot/repo/snapshot"))); + assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404)); + } + public void testFreezeAction() throws Exception { createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)); @@ -441,6 +530,50 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase { }); } + public void testFreezeDuringSnapshot() throws Exception { + // Create the repository before taking the snapshot. + Request request = new Request("PUT", "/_snapshot/repo"); + request.setJsonEntity(Strings + .toString(JsonXContent.contentBuilder() + .startObject() + .field("type", "fs") + .startObject("settings") + .field("compress", randomBoolean()) + .field("location", System.getProperty("tests.path.repo")) + .field("max_snapshot_bytes_per_sec", "256b") + .endObject() + .endObject())); + assertOK(client().performRequest(request)); + // create delete policy + createNewSingletonPolicy("cold", new FreezeAction(), TimeValue.timeValueMillis(0)); + // create index without policy + createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)); + // index document so snapshot actually does something + indexDocument(); + // start snapshot + request = new Request("PUT", "/_snapshot/repo/snapshot"); + request.addParameter("wait_for_completion", "false"); + request.setJsonEntity("{\"indices\": \"" + index + "\"}"); + assertOK(client().performRequest(request)); + // add policy and expect it to trigger delete immediately (while snapshot in progress) + updatePolicy(index, policy); + // assert that the index froze + assertBusy(() -> { + Map settings = getOnlyIndexSettings(index); + assertThat(getStepKeyForIndex(index), equalTo(TerminalPolicyStep.KEY)); + assertThat(settings.get(IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true")); + assertThat(settings.get(IndexSettings.INDEX_SEARCH_THROTTLED.getKey()), equalTo("true")); + assertThat(settings.get(FrozenEngine.INDEX_FROZEN.getKey()), equalTo("true")); + }, 2, TimeUnit.MINUTES); + // assert that snapshot is still in progress and clean up + assertThat(getSnapshotState("snapshot"), equalTo("SUCCESS")); + assertOK(client().performRequest(new Request("DELETE", "/_snapshot/repo/snapshot"))); + ResponseException e = expectThrows(ResponseException.class, + () -> client().performRequest(new Request("GET", "/_snapshot/repo/snapshot"))); + assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404)); + } + public void testSetPriority() throws Exception { createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetaData.INDEX_PRIORITY_SETTING.getKey(), 100)); @@ -763,4 +896,15 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase { Response response = client().performRequest(indexRequest); logger.info(response.getStatusLine()); } + + private String getSnapshotState(String snapshot) throws IOException { + Response response = client().performRequest(new Request("GET", "/_snapshot/repo/" + snapshot)); + Map responseMap; + try (InputStream is = response.getEntity().getContent()) { + responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); + } + Map snapResponse = ((List>) responseMap.get("snapshots")).get(0); + assertThat(snapResponse.get("snapshot"), equalTo(snapshot)); + return (String) snapResponse.get("state"); + } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java index baa1d8facd9..2e7d2fbbc55 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java @@ -135,8 +135,8 @@ public class IndexLifecycle extends Plugin implements ActionPlugin { if (enabled == false || transportClientMode) { return emptyList(); } - indexLifecycleInitialisationService - .set(new IndexLifecycleService(settings, client, clusterService, getClock(), System::currentTimeMillis, xContentRegistry)); + indexLifecycleInitialisationService.set(new IndexLifecycleService(settings, client, clusterService, threadPool, + getClock(), System::currentTimeMillis, xContentRegistry)); return Collections.singletonList(indexLifecycleInitialisationService.get()); } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java index 7ba7bcafe55..f6c068d945d 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; @@ -23,6 +24,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.Index; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep; import org.elasticsearch.xpack.core.indexlifecycle.AsyncWaitStep; import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateActionStep; @@ -53,14 +55,17 @@ public class IndexLifecycleRunner { private static final Logger logger = LogManager.getLogger(IndexLifecycleRunner.class); private static final ToXContent.Params STACKTRACE_PARAMS = new ToXContent.MapParams(Collections.singletonMap(REST_EXCEPTION_SKIP_STACK_TRACE, "false")); + private final ThreadPool threadPool; private PolicyStepsRegistry stepRegistry; private ClusterService clusterService; private LongSupplier nowSupplier; - public IndexLifecycleRunner(PolicyStepsRegistry stepRegistry, ClusterService clusterService, LongSupplier nowSupplier) { + public IndexLifecycleRunner(PolicyStepsRegistry stepRegistry, ClusterService clusterService, + ThreadPool threadPool, LongSupplier nowSupplier) { this.stepRegistry = stepRegistry; this.clusterService = clusterService; this.nowSupplier = nowSupplier; + this.threadPool = threadPool; } /** @@ -168,7 +173,8 @@ public class IndexLifecycleRunner { } if (currentStep instanceof AsyncActionStep) { logger.debug("[{}] running policy with async action step [{}]", index, currentStep.getKey()); - ((AsyncActionStep) currentStep).performAction(indexMetaData, currentState, new AsyncActionStep.Listener() { + ((AsyncActionStep) currentStep).performAction(indexMetaData, currentState, + new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()), new AsyncActionStep.Listener() { @Override public void onResponse(boolean complete) { diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java index b77997a94a3..d143e80340c 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java @@ -53,13 +53,14 @@ public class IndexLifecycleService private final PolicyStepsRegistry policyRegistry; private final IndexLifecycleRunner lifecycleRunner; private final Settings settings; + private final ThreadPool threadPool; private Client client; private ClusterService clusterService; private LongSupplier nowSupplier; private SchedulerEngine.Job scheduledJob; - public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, Clock clock, LongSupplier nowSupplier, - NamedXContentRegistry xContentRegistry) { + public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool, Clock clock, + LongSupplier nowSupplier, NamedXContentRegistry xContentRegistry) { super(); this.settings = settings; this.client = client; @@ -68,7 +69,8 @@ public class IndexLifecycleService this.nowSupplier = nowSupplier; this.scheduledJob = null; this.policyRegistry = new PolicyStepsRegistry(xContentRegistry, client); - this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService, nowSupplier); + this.threadPool = threadPool; + this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService, threadPool, nowSupplier); this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings); clusterService.addStateApplier(this); clusterService.addListener(this); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java index f4ab15a30e8..565a9723c8c 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.Version; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -52,6 +53,8 @@ import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction; import org.elasticsearch.xpack.core.indexlifecycle.Step; import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey; import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep; +import org.junit.After; +import org.junit.Before; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; @@ -80,6 +83,7 @@ import static org.mockito.Mockito.mock; public class IndexLifecycleRunnerTests extends ESTestCase { private static final NamedXContentRegistry REGISTRY; + private ThreadPool threadPool; static { try (IndexLifecycle indexLifecycle = new IndexLifecycle(Settings.EMPTY)) { @@ -88,6 +92,16 @@ public class IndexLifecycleRunnerTests extends ESTestCase { } } + @Before + public void prepareThreadPool() { + threadPool = new TestThreadPool("test"); + } + + @After + public void shutdown() { + threadPool.shutdownNow(); + } + /** A real policy steps registry where getStep can be overridden so that JSON doesn't have to be parsed */ private class MockPolicyStepsRegistry extends PolicyStepsRegistry { private BiFunction fn = null; @@ -142,7 +156,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { TerminalPolicyStep step = TerminalPolicyStep.INSTANCE; PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -157,7 +171,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { MockClusterStateWaitStep step = new MockClusterStateWaitStep(stepKey, null); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); LifecycleExecutionState.Builder newState = LifecycleExecutionState.builder(); newState.setPhase(stepKey.getPhase()); newState.setAction(stepKey.getAction()); @@ -197,7 +211,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { .localNodeId(node.getId())) .build(); ClusterServiceUtils.setState(clusterService, state); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); @@ -258,7 +272,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { .build(); ClusterServiceUtils.setState(clusterService, state); long stepTime = randomLong(); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> stepTime); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> stepTime); ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); @@ -307,7 +321,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { .localNodeId(node.getId())) .build(); ClusterServiceUtils.setState(clusterService, state); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); ClusterState before = clusterService.state(); // State changes should not run AsyncAction steps @@ -366,7 +380,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { .build(); logger.info("--> state: {}", state); ClusterServiceUtils.setState(clusterService, state); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); @@ -435,7 +449,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { .build(); logger.info("--> state: {}", state); ClusterServiceUtils.setState(clusterService, state); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); @@ -458,7 +472,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, null); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -476,7 +490,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { step.setWillComplete(true); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -495,7 +509,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { step.setException(expectedException); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -513,7 +527,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { step.setException(expectedException); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -527,7 +541,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { String policyName = "cluster_state_action_policy"; ClusterService clusterService = mock(ClusterService.class); IndexLifecycleRunner runner = new IndexLifecycleRunner(new PolicyStepsRegistry(NamedXContentRegistry.EMPTY, null), - clusterService, () -> 0L); + clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); // verify that no exception is thrown @@ -889,7 +903,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(), Collections.singletonList(policyMetadata)); Index index = clusterState.metaData().index(indexName).getIndex(); - IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, () -> now); + IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, threadPool, () -> now); ClusterState nextClusterState = runner.moveClusterStateToFailedStep(clusterState, indices); IndexLifecycleRunnerTests.assertClusterStateOnNextStep(clusterState, index, errorStepKey, failedStepKey, nextClusterState, now); @@ -923,7 +937,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { lifecycleState.setFailedStep(failedStepKey.getName()); ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(), Collections.singletonList(policyMetadata)); - IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, () -> now); + IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, threadPool, () -> now); IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> runner.moveClusterStateToFailedStep(clusterState, indices)); assertThat(exception.getMessage(), equalTo("step [" + failedStepKey @@ -935,7 +949,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { String invalidIndexName = "does_not_exist"; ClusterState clusterState = buildClusterState(existingIndexName, Settings.builder(), LifecycleExecutionState.builder().build(), Collections.emptyList()); - IndexLifecycleRunner runner = new IndexLifecycleRunner(null, null, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(null, null, threadPool, () -> 0L); IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> runner.moveClusterStateToFailedStep(clusterState, new String[] { invalidIndexName })); assertThat(exception.getMessage(), equalTo("index [" + invalidIndexName + "] does not exist")); @@ -958,7 +972,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { lifecycleState.setStep(errorStepKey.getName()); lifecycleState.setFailedStep(failedStepKey.getName()); ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(), Collections.emptyList()); - IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, () -> now); + IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, threadPool, () -> now); IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> runner.moveClusterStateToFailedStep(clusterState, indices)); assertThat(exception.getMessage(), equalTo("index [" + indexName + "] is not associated with an Index Lifecycle Policy")); @@ -979,7 +993,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { lifecycleState.setAction(failedStepKey.getAction()); lifecycleState.setStep(failedStepKey.getName()); ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(), Collections.emptyList()); - IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, () -> now); + IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, threadPool, () -> now); IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> runner.moveClusterStateToFailedStep(clusterState, indices)); assertThat(exception.getMessage(), equalTo("cannot retry an action for an index [" + indices[0] @@ -1176,7 +1190,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { stepMap, NamedXContentRegistry.EMPTY, null); ClusterService clusterService = mock(ClusterService.class); final AtomicLong now = new AtomicLong(5); - IndexLifecycleRunner runner = new IndexLifecycleRunner(policyStepsRegistry, clusterService, now::get); + IndexLifecycleRunner runner = new IndexLifecycleRunner(policyStepsRegistry, clusterService, threadPool, now::get); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)) .numberOfReplicas(randomIntBetween(0, 5)) @@ -1360,7 +1374,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase { } @Override - public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) { + public void performAction(IndexMetaData indexMetaData, ClusterState currentState, + ClusterStateObserver observer, Listener listener) { executeCount++; if (latch != null) { latch.countDown(); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java index 13fe9c1c690..810f913775e 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java @@ -25,8 +25,10 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState; import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyMetadata; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; @@ -67,6 +69,7 @@ public class IndexLifecycleServiceTests extends ESTestCase { private DiscoveryNode masterNode; private IndicesAdminClient indicesClient; private long now; + private ThreadPool threadPool; @Before public void prepareServices() { @@ -96,7 +99,9 @@ public class IndexLifecycleServiceTests extends ESTestCase { when(adminClient.indices()).thenReturn(indicesClient); when(client.settings()).thenReturn(Settings.EMPTY); - indexLifecycleService = new IndexLifecycleService(Settings.EMPTY, client, clusterService, clock, () -> now, null); + threadPool = new TestThreadPool("test"); + indexLifecycleService = new IndexLifecycleService(Settings.EMPTY, client, clusterService, threadPool, + clock, () -> now, null); Mockito.verify(clusterService).addListener(indexLifecycleService); Mockito.verify(clusterService).addStateApplier(indexLifecycleService); } @@ -104,6 +109,7 @@ public class IndexLifecycleServiceTests extends ESTestCase { @After public void cleanup() { indexLifecycleService.close(); + threadPool.shutdownNow(); }