diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/indexlifecycle/ShrinkAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/indexlifecycle/ShrinkAction.java index ab777524b3b..3f3ea32dc4d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/indexlifecycle/ShrinkAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/indexlifecycle/ShrinkAction.java @@ -136,6 +136,7 @@ public class ShrinkAction implements LifecycleAction { resizeRequest.getTargetIndexRequest().settings(Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfShards) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, indexMetaData.getNumberOfReplicas()) + .put("index.lifecycle.date", indexMetaData.getCreationDate()) .build()); indexMetaData.getAliases().values().spliterator().forEachRemaining(aliasMetaDataObjectCursor -> { resizeRequest.getTargetIndexRequest().alias(new Alias(aliasMetaDataObjectCursor.value.alias())); diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java index d17f3333164..d48220182ca 100644 --- a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java @@ -56,7 +56,6 @@ import static java.util.Collections.emptyList; public class IndexLifecycle extends Plugin { public static final String NAME = "index_lifecycle"; public static final String BASE_PATH = "/_xpack/index_lifecycle/"; - public static final String THREAD_POOL_NAME = NAME; private final SetOnce indexLifecycleInitialisationService = new SetOnce<>(); private Settings settings; private boolean enabled; @@ -71,6 +70,8 @@ public class IndexLifecycle extends Plugin { Setting.Property.Dynamic, Setting.Property.IndexScope); public static final Setting LIFECYCLE_ACTION_SETTING = Setting.simpleString("index.lifecycle.action", Setting.Property.Dynamic, Setting.Property.IndexScope); + public static final Setting LIFECYCLE_INDEX_CREATION_DATE_SETTING = Setting.longSetting("index.lifecycle.date", + -1L, -1L, Setting.Property.Dynamic, Setting.Property.IndexScope); public IndexLifecycle(Settings settings) { this.settings = settings; @@ -93,9 +94,10 @@ public class IndexLifecycle extends Plugin { @Override public List> getSettings() { return Arrays.asList( - LIFECYCLE_POLL_INTERVAL_SETTING, + LIFECYCLE_POLL_INTERVAL_SETTING, LIFECYCLE_NAME_SETTING, - LIFECYCLE_PHASE_SETTING, + LIFECYCLE_PHASE_SETTING, + LIFECYCLE_INDEX_CREATION_DATE_SETTING, LIFECYCLE_ACTION_SETTING); } diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java index 9e994f2f745..350210ea77e 100644 --- a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java @@ -8,11 +8,15 @@ package org.elasticsearch.xpack.indexlifecycle; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.FormattedMessage; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -20,15 +24,21 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.Index; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.scheduler.SchedulerEngine; import java.io.Closeable; import java.io.IOException; import java.time.Clock; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.SortedMap; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.function.LongSupplier; +import java.util.stream.Collectors; /** * A service which runs the {@link LifecyclePolicy}s associated with indexes. @@ -112,26 +122,35 @@ public class IndexLifecycleService extends AbstractComponent logger.info("Job triggered: " + event.getJobName() + ", " + event.getScheduledTime() + ", " + event.getTriggeredTime()); IndexLifecycleMetadata indexLifecycleMetadata = clusterService.state().metaData().custom(IndexLifecycleMetadata.TYPE); SortedMap policies = indexLifecycleMetadata.getPolicies(); + // loop through all indices in cluster state and filter for ones that are + // managed by the Index Lifecycle Service they have a index.lifecycle.name setting + // associated to a policy clusterService.state().metaData().indices().valuesIt().forEachRemaining((idxMeta) -> { String policyName = IndexLifecycle.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings()); if (Strings.isNullOrEmpty(policyName) == false) { - logger.info("Checking index for next action: " + idxMeta.getIndex().getName() + " (" + policyName + ")"); - LifecyclePolicy policy = policies.get(policyName); - if (policy == null) { - logger.error("Unknown lifecycle policy [{}] for index [{}]", policyName, idxMeta.getIndex().getName()); - } else { - try { - policy.execute(new InternalIndexLifecycleContext(idxMeta.getIndex(), client, clusterService, nowSupplier)); - } catch (Exception e) { - logger.error(new FormattedMessage("Failed to execute lifecycle policy [{}] for index [{}]", policyName, - idxMeta.getIndex().getName()), e); - } - } + // ensure that all managed indices have `index.lifecycle.date` set + // and then execute their respective lifecycle policies. + putLifecycleDate(idxMeta).thenRun(() -> executePolicy(idxMeta, policies, policyName)); } }); } } + private void executePolicy(IndexMetaData idxMeta, SortedMap policies, String policyName) { + logger.info("Checking index for next action: " + idxMeta.getIndex().getName() + " (" + policyName + ")"); + LifecyclePolicy policy = policies.get(policyName); + if (policy == null) { + logger.error("Unknown lifecycle policy [{}] for index [{}]", policyName, idxMeta.getIndex().getName()); + } else { + try { + policy.execute(new InternalIndexLifecycleContext(idxMeta.getIndex(), client, clusterService, nowSupplier)); + } catch (Exception e) { + logger.error(new FormattedMessage("Failed to execute lifecycle policy [{}] for index [{}]", policyName, + idxMeta.getIndex().getName()), e); + } + } + } + private void installMetadata(IndexLifecycleMetadata lifecycleMetadata) { threadPool.executor(ThreadPool.Names.GENERIC) .execute(() -> clusterService.submitStateUpdateTask("install-index-lifecycle-metadata", new ClusterStateUpdateTask() { @@ -151,8 +170,33 @@ public class IndexLifecycleService extends AbstractComponent })); } + private CompletableFuture putLifecycleDate(IndexMetaData idxMeta) { + CompletableFuture completableFuture = new CompletableFuture<>(); + if (idxMeta.getSettings().hasValue(IndexLifecycle.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey())) { + completableFuture.complete(null); + } else { + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(Settings.builder() + .put(IndexLifecycle.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey(), idxMeta.getCreationDate()).build(), + idxMeta.getIndex().getName()); + client.admin().indices().updateSettings(updateSettingsRequest, new ActionListener() { + @Override + public void onResponse(UpdateSettingsResponse updateSettingsResponse) { + completableFuture.complete(null); + } + + @Override + public void onFailure(Exception e) { + logger.error("unable to update index.lifecycle.date setting on indices", e); + completableFuture.completeExceptionally(e); + } + }); + } + + return completableFuture; + } + @Override - public void close() throws IOException { + public void close() { SchedulerEngine engine = scheduler.get(); if (engine != null) { engine.stop(); diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContext.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContext.java index eb92af520c2..c3ca8a1b607 100644 --- a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContext.java +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContext.java @@ -84,8 +84,8 @@ public class InternalIndexLifecycleContext implements IndexLifecycleContext { @Override public boolean canExecute(Phase phase) { long now = nowSupplier.getAsLong(); - long indexCreated = getIdxMetaData().getCreationDate(); - return (indexCreated + phase.getAfter().millis()) <= now; + long initialIndexCreated = IndexLifecycle.LIFECYCLE_INDEX_CREATION_DATE_SETTING.get(getIdxMetaData().getSettings()); + return (initialIndexCreated + phase.getAfter().millis()) <= now; } @Override diff --git a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java index 9cb83fca5f7..4f25d8c03da 100644 --- a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java +++ b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.indexlifecycle; import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsTestHelper; import org.elasticsearch.client.AdminClient; @@ -260,6 +261,14 @@ public class IndexLifecycleServiceTests extends ESTestCase { assertNull(indexLifecycleService.getScheduler()); } + /** + * Checks that a new index does the following successfully: + * + * 1. setting index.lifecycle.date + * 2. sets phase + * 3. sets action + * 4. executes action + */ @SuppressWarnings("unchecked") public void testTriggeredWithMatchingPolicy() { String policyName = randomAlphaOfLengthBetween(1, 20); @@ -289,18 +298,103 @@ public class IndexLifecycleServiceTests extends ESTestCase { when(clusterService.state()).thenReturn(currentState); + SetOnce dateUpdated = new SetOnce<>(); + SetOnce phaseUpdated = new SetOnce<>(); + SetOnce actionUpdated = new SetOnce<>(); doAnswer(invocationOnMock -> { + UpdateSettingsRequest request = (UpdateSettingsRequest) invocationOnMock.getArguments()[0]; ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + UpdateSettingsTestHelper.assertSettingsRequest(request, Settings.builder() + .put(IndexLifecycle.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey(), + indexMetadata.getCreationDate()).build(), index.getName()); + dateUpdated.set(true); + listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true)); + return null; + }).doAnswer(invocationOnMock -> { + UpdateSettingsRequest request = (UpdateSettingsRequest) invocationOnMock.getArguments()[0]; + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + UpdateSettingsTestHelper.assertSettingsRequest(request, Settings.builder() + .put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), "") + .put(IndexLifecycle.LIFECYCLE_PHASE_SETTING.getKey(), "phase").build(), index.getName()); + phaseUpdated.set(true); + listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true)); + return null; + }).doAnswer(invocationOnMock -> { + UpdateSettingsRequest request = (UpdateSettingsRequest) invocationOnMock.getArguments()[0]; + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + UpdateSettingsTestHelper.assertSettingsRequest(request, Settings.builder() + .put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), MockAction.NAME).build(), index.getName()); + actionUpdated.set(true); listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true)); return null; - }).when(indicesClient).updateSettings(any(), any()); indexLifecycleService.triggered(schedulerEvent); + assertThat(dateUpdated.get(), equalTo(true)); + assertThat(phaseUpdated.get(), equalTo(true)); + assertThat(actionUpdated.get(), equalTo(true)); assertThat(mockAction.getExecutedCount(), equalTo(1L)); } + /** + * Check that a policy is executed without first setting the `index.lifecycle.date` setting + */ + @SuppressWarnings("unchecked") + public void testTriggeredWithDateSettingAlreadyPresent() { + String policyName = randomAlphaOfLengthBetween(1, 20); + MockAction mockAction = new MockAction(); + Phase phase = new Phase("phase", TimeValue.ZERO, Collections.singletonMap("action", mockAction)); + LifecyclePolicy policy = new LifecyclePolicy(TestLifecycleType.INSTANCE, policyName, + Collections.singletonMap(phase.getName(), phase)); + SortedMap policyMap = new TreeMap<>(); + policyMap.put(policyName, policy); + Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + long creationDate = randomNonNegativeLong(); + IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()) + .settings(settings(Version.CURRENT) + .put(IndexLifecycle.LIFECYCLE_NAME_SETTING.getKey(), policyName) + .put(IndexLifecycle.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey(), creationDate)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).creationDate(creationDate).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder() + .fPut(index.getName(), indexMetadata); + MetaData metaData = MetaData.builder() + .putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap)) + .indices(indices.build()) + .persistentSettings(settings(Version.CURRENT).build()) + .build(); + ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) + .metaData(metaData) + .nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build()) + .build(); + + SchedulerEngine.Event schedulerEvent = new SchedulerEngine.Event(IndexLifecycle.NAME, randomLong(), randomLong()); + + when(clusterService.state()).thenReturn(currentState); + + SetOnce dateUpdated = new SetOnce<>(); + doAnswer(invocationOnMock -> { + UpdateSettingsRequest request = (UpdateSettingsRequest) invocationOnMock.getArguments()[0]; + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + try { + UpdateSettingsTestHelper.assertSettingsRequest(request, Settings.builder() + .put(IndexLifecycle.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey(), + indexMetadata.getCreationDate()).build(), index.getName()); + dateUpdated.set(true); + } catch (AssertionError e) { + // noop: here because we are either updating the phase or action prior to executing MockAction + } + listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true)); + return null; + }).when(indicesClient).updateSettings(any(), any()); + + indexLifecycleService.triggered(schedulerEvent); + + assertNull(dateUpdated.get()); + assertThat(mockAction.getExecutedCount(), equalTo(1L)); + + } + /** * Check that if an index has an unknown lifecycle policy set it does not * execute any policy but does process other indexes. diff --git a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContextTests.java b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContextTests.java index 1af76100fc1..7c49c02fe65 100644 --- a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContextTests.java +++ b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContextTests.java @@ -40,8 +40,6 @@ public class InternalIndexLifecycleContextTests extends ESTestCase { MetaData metaData = MetaData.builder().indices(indices.build()) .persistentSettings(settings(Version.CURRENT).build()).build(); return ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build(); -// ClusterService clusterService = Mockito.mock(ClusterService.class); -// Mockito.when(clusterService.state()).thenReturn(clusterState); } public void testSetPhase() { @@ -495,7 +493,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase { long now = random().longs(creationDate, creationDate + after.millis()).iterator().nextLong(); IndexMetaData idxMeta = IndexMetaData.builder(TEST_INDEX.getName()) - .settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build()) + .settings(Settings.builder().put("index.version.created", 7000001L).put("index.lifecycle.date", creationDate).build()) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); ClusterState clusterState = getClusterState(idxMeta); ClusterService clusterService = Mockito.mock(ClusterService.class); @@ -514,7 +512,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase { long now = creationDate + after.millis(); IndexMetaData idxMeta = IndexMetaData.builder(TEST_INDEX.getName()) - .settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build()) + .settings(Settings.builder().put("index.version.created", 7000001L).put("index.lifecycle.date", creationDate).build()) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); ClusterState clusterState = getClusterState(idxMeta); ClusterService clusterService = Mockito.mock(ClusterService.class); @@ -533,7 +531,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase { long now = random().longs(creationDate + after.millis(), Long.MAX_VALUE).iterator().nextLong(); IndexMetaData idxMeta = IndexMetaData.builder(TEST_INDEX.getName()) - .settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build()) + .settings(Settings.builder().put("index.version.created", 7000001L).put("index.lifecycle.date", creationDate).build()) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); ClusterState clusterState = getClusterState(idxMeta); ClusterService clusterService = Mockito.mock(ClusterService.class); diff --git a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/ShrinkActionTests.java b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/ShrinkActionTests.java index 2551a0019e2..456819c031e 100644 --- a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/ShrinkActionTests.java +++ b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/ShrinkActionTests.java @@ -226,10 +226,12 @@ public class ShrinkActionTests extends AbstractSerializingTestCase Index targetIndex = new Index("shrunk-" + index.getName(), randomAlphaOfLengthBetween(1, 20)); int numberOfShards = randomIntBetween(1, 5); int numberOfReplicas = randomIntBetween(1, 5); + long creationDate = randomNonNegativeLong(); ClusterService clusterService = Mockito.mock(ClusterService.class); IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()) .settings(settings(Version.CURRENT)) .putAlias(AliasMetaData.builder("my_alias")) + .creationDate(creationDate) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(numberOfReplicas).build(); ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), indexMetadata); @@ -266,7 +268,8 @@ public class ShrinkActionTests extends AbstractSerializingTestCase assertThat(request.getTargetIndexRequest().aliases(), equalTo(Collections.singleton(new Alias("my_alias")))); assertThat(request.getTargetIndexRequest().settings(), equalTo(Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfShards) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build())); + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + .put("index.lifecycle.date", creationDate).build())); assertThat(request.getTargetIndexRequest().index(), equalTo(targetIndex.getName())); ResizeResponse resizeResponse = ResizeAction.INSTANCE.newResponse(); resizeResponse.readFrom(StreamInput.wrap(new byte[] { 1, 1, 1, 1, 1 }));